Discussion:
throttling/backpressure on a route
Peter Nagy (Jr)
2018-10-16 07:16:27 UTC
Permalink
Newbie question incoming.

I have a route that looks like

...
.setHeader("CamelMongoDbBatchSize", 128)
.to("mongodb3:mymongo?...&operation=aggregate&outputType=MongoIterable")
.split(constant(true))
.streaming()
...
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(128)
.to("mongodb3:myothermongo?...&operation=bulkWrite")

Pardon if there's some typos, I wrote this by hand since I'm using Camel
from clojure and the clojure code looks a bit different.

This works as expected, the aggregation is retrieved in batches and sent
downstream one-by-one, being processed with various Processors and
finally written in batches.

I had a bug before where I had

.aggregate().body().completionSize(128).completionTimeout(30000)

which resulted in the aggregation waiting for the timeout. However in
the meantime the mongo aggregation query finished processing everything.

This raised some questions for me.

1. My process needs to be memory-friendly. Even after fixing the bug I
need to be sure the aggregation query won't fill the RAM when downstream
can't keep up. How can I apply some backpressure? How can I tell the
route "don't process more until someone downstream says you can"? E.g.
saying if there's more than N exchanges pending, wait.

2. Who was buffering the exchanges? I had ~2k entries flowing through,
where did they end up queued?

3. Did I miss a doc page where these questions are explained? I spent a
considerable amount of time searching for an answer thinking "This must
be a common requirement, surely there's an example somewhere".

--
To reach a goal one has to enjoy the journey.
Onder SEZGIN
2018-10-16 11:57:53 UTC
Permalink
Hi,
I would suggest
1- Take a look at throttle-eip. (Make sure you understand split, aggregate
and mongodb3 producer adocs around mongodb aggregation. Try and see your
case unless you can explain or create a unit test to pinpoint your issue
and also check mongodb3 components test case for better examples.)
2- All in memory. Depending on your route design, they are either in
memoryaggregationrepository or flowing through downstream enpoints / routes.
3- For your example, take a look at split, aggreagte, mongodb3 aggregate
functionality and possibly, throttle or delay eips.
Post by Peter Nagy (Jr)
Newbie question incoming.
I have a route that looks like
...
.setHeader("CamelMongoDbBatchSize", 128)
.to("mongodb3:mymongo?...&operation=aggregate&outputType=MongoIterable")
.split(constant(true))
.streaming()
...
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(128)
.to("mongodb3:myothermongo?...&operation=bulkWrite")
Pardon if there's some typos, I wrote this by hand since I'm using Camel
from clojure and the clojure code looks a bit different.
This works as expected, the aggregation is retrieved in batches and sent
downstream one-by-one, being processed with various Processors and
finally written in batches.
I had a bug before where I had
.aggregate().body().completionSize(128).completionTimeout(30000)
which resulted in the aggregation waiting for the timeout. However in
the meantime the mongo aggregation query finished processing everything.
This raised some questions for me.
1. My process needs to be memory-friendly. Even after fixing the bug I
need to be sure the aggregation query won't fill the RAM when downstream
can't keep up. How can I apply some backpressure? How can I tell the
route "don't process more until someone downstream says you can"? E.g.
saying if there's more than N exchanges pending, wait.
2. Who was buffering the exchanges? I had ~2k entries flowing through,
where did they end up queued?
3. Did I miss a doc page where these questions are explained? I spent a
considerable amount of time searching for an answer thinking "This must
be a common requirement, surely there's an example somewhere".
--
To reach a goal one has to enjoy the journey.
Peter Nagy (Jr)
2018-10-17 14:08:34 UTC
Permalink
Hi Onder,
Post by Onder SEZGIN
Hi,
I would suggest
1- Take a look at throttle-eip. (Make sure you understand split, aggregate
and mongodb3 producer adocs around mongodb aggregation. Try and see your
case unless you can explain or create a unit test to pinpoint your issue
and also check mongodb3 components test case for better examples.)
I looked at the throttle EIP before, unfortunately this doesn't seem to
solve the issue at hand. What if the producer is producing more than
downstream can manage? Memory will be blasted. The throttler cannot know
how is downstream doing. Downstream needs to tell upstream if it's OK to
send more.
Post by Onder SEZGIN
2- All in memory. Depending on your route design, they are either in
memoryaggregationrepository or flowing through downstream enpoints / routes.
Understood, thanks.
Post by Onder SEZGIN
3- For your example, take a look at split, aggreagte, mongodb3 aggregate
functionality and possibly, throttle or delay eips.
As explained above none of these seem to solve the issue. There needs to
be communication between the producers. Having

A -> B -> C -> D -> E -> F

E needs to tell B "Wait" and when it catches up tell B "OK please send
more". Otherwise things will flow downstream, filling up memory.

Surely constraining memory usage is a solved problem? I'm just too new
to understand what pattern(s) solve it.
Post by Onder SEZGIN
Post by Peter Nagy (Jr)
Newbie question incoming.
I have a route that looks like
...
.setHeader("CamelMongoDbBatchSize", 128)
.to("mongodb3:mymongo?...&operation=aggregate&outputType=MongoIterable")
.split(constant(true))
.streaming()
...
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(128)
.to("mongodb3:myothermongo?...&operation=bulkWrite")
Pardon if there's some typos, I wrote this by hand since I'm using Camel
from clojure and the clojure code looks a bit different.
This works as expected, the aggregation is retrieved in batches and sent
downstream one-by-one, being processed with various Processors and
finally written in batches.
I had a bug before where I had
.aggregate().body().completionSize(128).completionTimeout(30000)
which resulted in the aggregation waiting for the timeout. However in
the meantime the mongo aggregation query finished processing everything.
This raised some questions for me.
1. My process needs to be memory-friendly. Even after fixing the bug I
need to be sure the aggregation query won't fill the RAM when downstream
can't keep up. How can I apply some backpressure? How can I tell the
route "don't process more until someone downstream says you can"? E.g.
saying if there's more than N exchanges pending, wait.
2. Who was buffering the exchanges? I had ~2k entries flowing through,
where did they end up queued?
3. Did I miss a doc page where these questions are explained? I spent a
considerable amount of time searching for an answer thinking "This must
be a common requirement, surely there's an example somewhere".
--
To reach a goal one has to enjoy the journey.
--
To reach a goal one has to enjoy the journey.
Valdis Andersons
2018-10-17 14:53:49 UTC
Permalink
Hi Peter,

Could you potentially use a message broker (RabbitMQ, ActiveMQ etc.) downstream from the aggregator that is causing you concern?

Mongo -> Aggregator -> Message Broker Queue -> Consumer -> ... -> Mongo

With the consumer counts it's quite easy to restrict throughput to the max your hardware can handle because consumers don't ask for a new message from a queue unless they've done processing the previous one.

We had to do something similar to some of our RESTful endpoints when some nightly batches had the potential to fire in a huge amount of requests but the downstream processors couldn't handle it.
It makes the route a bit more complex but we haven't had an out of memory error since (still working fast enough for us).


Regards,
Valdis

-----Original Message-----
From: Peter Nagy (Jr) [mailto:***@gratex.com]
Sent: 17 October 2018 15:09
To: ***@camel.apache.org
Subject: Re: throttling/backpressure on a route

Hi Onder,
Post by Onder SEZGIN
Hi,
I would suggest
1- Take a look at throttle-eip. (Make sure you understand split,
aggregate and mongodb3 producer adocs around mongodb aggregation. Try
and see your case unless you can explain or create a unit test to
pinpoint your issue and also check mongodb3 components test case for
better examples.)
I looked at the throttle EIP before, unfortunately this doesn't seem to solve the issue at hand. What if the producer is producing more than downstream can manage? Memory will be blasted. The throttler cannot know how is downstream doing. Downstream needs to tell upstream if it's OK to send more.
Post by Onder SEZGIN
2- All in memory. Depending on your route design, they are either in
memoryaggregationrepository or flowing through downstream enpoints / routes.
Understood, thanks.
Post by Onder SEZGIN
3- For your example, take a look at split, aggreagte, mongodb3
aggregate functionality and possibly, throttle or delay eips.
As explained above none of these seem to solve the issue. There needs to be communication between the producers. Having

A -> B -> C -> D -> E -> F

E needs to tell B "Wait" and when it catches up tell B "OK please send more". Otherwise things will flow downstream, filling up memory.

Surely constraining memory usage is a solved problem? I'm just too new to understand what pattern(s) solve it.
Post by Onder SEZGIN
Post by Peter Nagy (Jr)
Newbie question incoming.
I have a route that looks like
...
.setHeader("CamelMongoDbBatchSize", 128)
.to("mongodb3:mymongo?...&operation=aggregate&outputType=MongoIterabl
e")
.split(constant(true))
.streaming()
...
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(128)
.to("mongodb3:myothermongo?...&operation=bulkWrite")
Pardon if there's some typos, I wrote this by hand since I'm using
Camel from clojure and the clojure code looks a bit different.
This works as expected, the aggregation is retrieved in batches and
sent downstream one-by-one, being processed with various Processors
and finally written in batches.
I had a bug before where I had
.aggregate().body().completionSize(128).completionTimeout(30000)
which resulted in the aggregation waiting for the timeout. However in
the meantime the mongo aggregation query finished processing everything.
This raised some questions for me.
1. My process needs to be memory-friendly. Even after fixing the bug
I need to be sure the aggregation query won't fill the RAM when
downstream can't keep up. How can I apply some backpressure? How can
I tell the route "don't process more until someone downstream says you can"? E.g.
saying if there's more than N exchanges pending, wait.
2. Who was buffering the exchanges? I had ~2k entries flowing
through, where did they end up queued?
3. Did I miss a doc page where these questions are explained? I spent
a considerable amount of time searching for an answer thinking "This
must be a common requirement, surely there's an example somewhere".
--
To reach a goal one has to enjoy the journey.
--
To reach a goal one has to enjoy the journey.
Peter Nagy (Jr)
2018-10-17 15:22:45 UTC
Permalink
Hi Valdis,

and your MQ is throttling how? Serializing it to disk? Or is it keeping
it in RAM but running on a different machine?

It seems to be a bit of an overkill :)


I just found ThrottlingInflightRoutePolicy, reading its description it
seems to be what I'm looking for. I will double-check tomorrow.

Thanks for all the tips so far.
Post by Valdis Andersons
Hi Peter,
Could you potentially use a message broker (RabbitMQ, ActiveMQ etc.) downstream from the aggregator that is causing you concern?
Mongo -> Aggregator -> Message Broker Queue -> Consumer -> ... -> Mongo
With the consumer counts it's quite easy to restrict throughput to the max your hardware can handle because consumers don't ask for a new message from a queue unless they've done processing the previous one.
We had to do something similar to some of our RESTful endpoints when some nightly batches had the potential to fire in a huge amount of requests but the downstream processors couldn't handle it.
It makes the route a bit more complex but we haven't had an out of memory error since (still working fast enough for us).
Regards,
Valdis
-----Original Message-----
Sent: 17 October 2018 15:09
Subject: Re: throttling/backpressure on a route
Hi Onder,
Post by Onder SEZGIN
Hi,
I would suggest
1- Take a look at throttle-eip. (Make sure you understand split,
aggregate and mongodb3 producer adocs around mongodb aggregation. Try
and see your case unless you can explain or create a unit test to
pinpoint your issue and also check mongodb3 components test case for
better examples.)
I looked at the throttle EIP before, unfortunately this doesn't seem to solve the issue at hand. What if the producer is producing more than downstream can manage? Memory will be blasted. The throttler cannot know how is downstream doing. Downstream needs to tell upstream if it's OK to send more.
Post by Onder SEZGIN
2- All in memory. Depending on your route design, they are either in
memoryaggregationrepository or flowing through downstream enpoints / routes.
Understood, thanks.
Post by Onder SEZGIN
3- For your example, take a look at split, aggreagte, mongodb3
aggregate functionality and possibly, throttle or delay eips.
As explained above none of these seem to solve the issue. There needs to be communication between the producers. Having
A -> B -> C -> D -> E -> F
E needs to tell B "Wait" and when it catches up tell B "OK please send more". Otherwise things will flow downstream, filling up memory.
Surely constraining memory usage is a solved problem? I'm just too new to understand what pattern(s) solve it.
Post by Onder SEZGIN
Post by Peter Nagy (Jr)
Newbie question incoming.
I have a route that looks like
...
.setHeader("CamelMongoDbBatchSize", 128)
.to("mongodb3:mymongo?...&operation=aggregate&outputType=MongoIterabl
e")
.split(constant(true))
.streaming()
...
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(128)
.to("mongodb3:myothermongo?...&operation=bulkWrite")
Pardon if there's some typos, I wrote this by hand since I'm using
Camel from clojure and the clojure code looks a bit different.
This works as expected, the aggregation is retrieved in batches and
sent downstream one-by-one, being processed with various Processors
and finally written in batches.
I had a bug before where I had
.aggregate().body().completionSize(128).completionTimeout(30000)
which resulted in the aggregation waiting for the timeout. However in
the meantime the mongo aggregation query finished processing everything.
This raised some questions for me.
1. My process needs to be memory-friendly. Even after fixing the bug
I need to be sure the aggregation query won't fill the RAM when
downstream can't keep up. How can I apply some backpressure? How can
I tell the route "don't process more until someone downstream says you can"? E.g.
saying if there's more than N exchanges pending, wait.
2. Who was buffering the exchanges? I had ~2k entries flowing
through, where did they end up queued?
3. Did I miss a doc page where these questions are explained? I spent
a considerable amount of time searching for an answer thinking "This
must be a common requirement, surely there's an example somewhere".
--
To reach a goal one has to enjoy the journey.
--
To reach a goal one has to enjoy the journey.
Valdis Andersons
2018-10-17 15:50:23 UTC
Permalink
Hi Peter,

Yes, it's running on a different machine that's dedicated to RabbitMQ and we are serializing the messages to disk - exchange.getIn().setHeader("rabbitmq.DELIVERY_MODE", 2), though I'm not sure if that header is actually needed for the broker to manage its own memory usage (we set it for the purpose of retaining messages during broker restarts). Our test broker is running on a puny little 4BG RAM VM and has had no issues with running out of memory during the batch runs that killed the REST endpoints.


Regards,
Valdis

-----Original Message-----
From: Peter Nagy (Jr) [mailto:***@gratex.com]
Sent: 17 October 2018 16:23
To: ***@camel.apache.org
Subject: Re: throttling/backpressure on a route

Hi Valdis,

and your MQ is throttling how? Serializing it to disk? Or is it keeping it in RAM but running on a different machine?

It seems to be a bit of an overkill :)


I just found ThrottlingInflightRoutePolicy, reading its description it seems to be what I'm looking for. I will double-check tomorrow.

Thanks for all the tips so far.
Post by Valdis Andersons
Hi Peter,
Could you potentially use a message broker (RabbitMQ, ActiveMQ etc.) downstream from the aggregator that is causing you concern?
Mongo -> Aggregator -> Message Broker Queue -> Consumer -> ... -> Mongo
With the consumer counts it's quite easy to restrict throughput to the max your hardware can handle because consumers don't ask for a new message from a queue unless they've done processing the previous one.
We had to do something similar to some of our RESTful endpoints when some nightly batches had the potential to fire in a huge amount of requests but the downstream processors couldn't handle it.
It makes the route a bit more complex but we haven't had an out of memory error since (still working fast enough for us).
Regards,
Valdis
-----Original Message-----
Sent: 17 October 2018 15:09
Subject: Re: throttling/backpressure on a route
Hi Onder,
Post by Onder SEZGIN
Hi,
I would suggest
1- Take a look at throttle-eip. (Make sure you understand split,
aggregate and mongodb3 producer adocs around mongodb aggregation. Try
and see your case unless you can explain or create a unit test to
pinpoint your issue and also check mongodb3 components test case for
better examples.)
I looked at the throttle EIP before, unfortunately this doesn't seem to solve the issue at hand. What if the producer is producing more than downstream can manage? Memory will be blasted. The throttler cannot know how is downstream doing. Downstream needs to tell upstream if it's OK to send more.
Post by Onder SEZGIN
2- All in memory. Depending on your route design, they are either in
memoryaggregationrepository or flowing through downstream enpoints / routes.
Understood, thanks.
Post by Onder SEZGIN
3- For your example, take a look at split, aggreagte, mongodb3
aggregate functionality and possibly, throttle or delay eips.
As explained above none of these seem to solve the issue. There needs
to be communication between the producers. Having
A -> B -> C -> D -> E -> F
E needs to tell B "Wait" and when it catches up tell B "OK please send more". Otherwise things will flow downstream, filling up memory.
Surely constraining memory usage is a solved problem? I'm just too new to understand what pattern(s) solve it.
Post by Onder SEZGIN
Post by Peter Nagy (Jr)
Newbie question incoming.
I have a route that looks like
...
.setHeader("CamelMongoDbBatchSize", 128)
.to("mongodb3:mymongo?...&operation=aggregate&outputType=MongoIterab
l
e")
.split(constant(true))
.streaming()
...
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(128)
.to("mongodb3:myothermongo?...&operation=bulkWrite")
Pardon if there's some typos, I wrote this by hand since I'm using
Camel from clojure and the clojure code looks a bit different.
This works as expected, the aggregation is retrieved in batches and
sent downstream one-by-one, being processed with various Processors
and finally written in batches.
I had a bug before where I had
.aggregate().body().completionSize(128).completionTimeout(30000)
which resulted in the aggregation waiting for the timeout. However
in the meantime the mongo aggregation query finished processing everything.
This raised some questions for me.
1. My process needs to be memory-friendly. Even after fixing the bug
I need to be sure the aggregation query won't fill the RAM when
downstream can't keep up. How can I apply some backpressure? How can
I tell the route "don't process more until someone downstream says you can"? E.g.
saying if there's more than N exchanges pending, wait.
2. Who was buffering the exchanges? I had ~2k entries flowing
through, where did they end up queued?
3. Did I miss a doc page where these questions are explained? I
spent a considerable amount of time searching for an answer thinking
"This must be a common requirement, surely there's an example somewhere".
--
To reach a goal one has to enjoy the journey.
--
To reach a goal one has to enjoy the journey.

Loading...