Peter Nagy (Jr)
2018-10-16 07:16:27 UTC
Newbie question incoming.
I have a route that looks like
...
.setHeader("CamelMongoDbBatchSize", 128)
.to("mongodb3:mymongo?...&operation=aggregate&outputType=MongoIterable")
.split(constant(true))
.streaming()
...
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(128)
.to("mongodb3:myothermongo?...&operation=bulkWrite")
Pardon if there's some typos, I wrote this by hand since I'm using Camel
from clojure and the clojure code looks a bit different.
This works as expected, the aggregation is retrieved in batches and sent
downstream one-by-one, being processed with various Processors and
finally written in batches.
I had a bug before where I had
.aggregate().body().completionSize(128).completionTimeout(30000)
which resulted in the aggregation waiting for the timeout. However in
the meantime the mongo aggregation query finished processing everything.
This raised some questions for me.
1. My process needs to be memory-friendly. Even after fixing the bug I
need to be sure the aggregation query won't fill the RAM when downstream
can't keep up. How can I apply some backpressure? How can I tell the
route "don't process more until someone downstream says you can"? E.g.
saying if there's more than N exchanges pending, wait.
2. Who was buffering the exchanges? I had ~2k entries flowing through,
where did they end up queued?
3. Did I miss a doc page where these questions are explained? I spent a
considerable amount of time searching for an answer thinking "This must
be a common requirement, surely there's an example somewhere".
--
To reach a goal one has to enjoy the journey.
I have a route that looks like
...
.setHeader("CamelMongoDbBatchSize", 128)
.to("mongodb3:mymongo?...&operation=aggregate&outputType=MongoIterable")
.split(constant(true))
.streaming()
...
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(128)
.to("mongodb3:myothermongo?...&operation=bulkWrite")
Pardon if there's some typos, I wrote this by hand since I'm using Camel
from clojure and the clojure code looks a bit different.
This works as expected, the aggregation is retrieved in batches and sent
downstream one-by-one, being processed with various Processors and
finally written in batches.
I had a bug before where I had
.aggregate().body().completionSize(128).completionTimeout(30000)
which resulted in the aggregation waiting for the timeout. However in
the meantime the mongo aggregation query finished processing everything.
This raised some questions for me.
1. My process needs to be memory-friendly. Even after fixing the bug I
need to be sure the aggregation query won't fill the RAM when downstream
can't keep up. How can I apply some backpressure? How can I tell the
route "don't process more until someone downstream says you can"? E.g.
saying if there's more than N exchanges pending, wait.
2. Who was buffering the exchanges? I had ~2k entries flowing through,
where did they end up queued?
3. Did I miss a doc page where these questions are explained? I spent a
considerable amount of time searching for an answer thinking "This must
be a common requirement, surely there's an example somewhere".
--
To reach a goal one has to enjoy the journey.