Discussion:
count of processed messages when using aggregation
Olaf
2013-11-05 11:57:41 UTC
Permalink
Hello,

is there an easy way to count and sum all processed lines by an aggregator?
Suppose, my file has 9999 lines, the route split it into 100 lines chunks
and send them to a remote system. The goal is, to gather statistics of all
sent lines. In the example below, .log() would always print the aggregation
size (100 or less) and not the number of overall processed messages, which
should be 9999

@Override
public void configure() throws Exception {
from("file:input.csv")

.unmarshal().csv().split(body()).streaming().parallelProcessing()
.bean("myProcessor", "doWork")
.aggregate(constant("id"), new
Aggregator()).completionSize(100).completionTimeout(1000)
.parallelProcessing()
.to("remote")
.log("Sent ${header.RecordsCounter} records to remote");
}

Thanks in advance!
Taariq Levack
2013-11-05 12:53:04 UTC
Permalink
Hi

I think your split size is actually 9999, try use the header from the
split, CamelSplitSize.

Taariq
Post by Olaf
Hello,
is there an easy way to count and sum all processed lines by an aggregator?
Suppose, my file has 9999 lines, the route split it into 100 lines chunks
and send them to a remote system. The goal is, to gather statistics of all
sent lines. In the example below, .log() would always print the aggregation
size (100 or less) and not the number of overall processed messages, which
should be 9999
@Override
public void configure() throws Exception {
from("file:input.csv")
.unmarshal().csv().split(body()).streaming().parallelProcessing()
.bean("myProcessor", "doWork")
.aggregate(constant("id"), new
Aggregator()).completionSize(100).completionTimeout(1000)
.parallelProcessing()
.to("remote")
.log("Sent ${header.RecordsCounter} records to remote");
}
Thanks in advance!
--
http://camel.465427.n5.nabble.com/count-of-processed-messages-when-using-aggregation-tp5742649.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Olaf
2013-11-05 13:48:36 UTC
Permalink
Hello,

thanks! CamelSplitSize is useful. I'd add then

.to("remote")
.choice()
.when(property("CamelSplitComplete").isEqualTo("true"))
.log("splitted ${property.CamelSplitSize} records")
.otherwise()
.log("lfile not completed yet");

but, what if some messages fail or would be filtered and not sent to the
remote system. How to sum them?
Taariq Levack
2013-11-05 15:41:31 UTC
Permalink
Aggregator has a CamelAggregatedSize, maybe try Simple[1] to set a header with the sum.
[1] http://camel.apache.org/simple.html
Post by Olaf
Hello,
thanks! CamelSplitSize is useful. I'd add then
.to("remote")
.choice()
.when(property("CamelSplitComplete").isEqualTo("true"))
.log("splitted ${property.CamelSplitSize} records")
.otherwise()
.log("lfile not completed yet");
but, what if some messages fail or would be filtered and not sent to the
remote system. How to sum them?
--
View this message in context: http://camel.465427.n5.nabble.com/count-of-processed-messages-when-using-aggregation-tp5742649p5742655.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Olaf
2013-11-13 16:32:39 UTC
Permalink
Hello,

I still didn't find a solution. Somehow, I need to now, when all of
aggregated portions have been completed.

.aggregate(constant("id"), new
Aggregator()).completionSize(100).completionTimeout(10000).to("remote")

and after that I could log, that the file is completed.

Ideally I'd use .groupExchanges() and re-gather aggregated chunks into
single exchange. But it doesn't compile.
cristisor
2013-11-13 18:36:30 UTC
Permalink
You could set a count header on each exchange which leaves
".bean("myProcessor", "doWork")" and in the end log the number of the last
exchange or have the ".bean("myProcessor", "doWork")" increase an internal
counter and when you receive CamelSplitComplete you go into the myProcessor
bean again and log the counter value, then you reset it so that a new csv
can start from 0.
Something like this:
from("file:input.csv")

.unmarshal().csv().split(body()).streaming().parallelProcessing()
.bean("myProcessor", "doWork") // inside the doWork method
you increase the counter
.aggregate(constant("id"), new
Aggregator()).completionSize(100).completionTimeout(1000)
.parallelProcessing() // why would you need this one?
.to("remote")
.choice()
.when(property("CamelSplitComplete").isEqualTo("true"))
.bean("myProcessor", "logCounterAndResetCounter")
.otherwise()
.log("lfile not completed yet");

If sending to the remote server is time consuming and you need performance
then you could do something like this to increase the performance:
.aggregate(constant("id"), new
Aggregator()).completionSize(100).completionTimeout(1000)
.to("seda:queueName");

.from("seda:queueName")
.to("remote")

This will put the aggregated exchanges on another thread which will take
care of the sending and logging while the initial thread continues to
process csv lines without having to wait for the remote machine to
acknowledge the aggregated exchanges.
Olaf
2013-11-13 19:55:16 UTC
Permalink
Suppose I have 102 lines and first 100 lines need processing time more than 1
second. Then last 2 lines arrive .to("remote") as first aggregated chunk and
the exchange has CamelSplitComplete property set, which is misleading in
this case, because 100 lines are not processed yet completely. I cannot log
it as processed until all aggregated chunks are processed by .to("remote").
cristisor
2013-11-13 20:14:24 UTC
Permalink
This problem can be solved by a stream resequencer.
(http://camel.apache.org/resequencer.html - stream resequencing)
It will reorder the exchanges after being processed and send them to the
aggregator. You have to reorder them so that the exchange with the split
complete is always the last one, and this way you can remove the timeout on
the aggregator and add a predicate:
completionPredicate(header("CamelSplitComplete").isEqualTo(true))
Olaf
2013-11-13 20:27:10 UTC
Permalink
I'll try that. However, it will probably make the overall process slower,
because the aggregator must wait until all splitted messages are processed.
Anyway, thanks for help!
cristisor
2013-11-13 20:37:13 UTC
Permalink
That's the beauty of the stream resequencer, it allows the messages to go one
by one as they get ordered and the difference in speed is very little for
the overall process. The batch aggregator waits for all the exchanges to
completed and then it send them to the aggregator, which will bring
additional delays.
Another way would be to know how many messages arrived in the bean and keep
the last one away just before being aggregated until all the rest are
aggregated, but you might end up with pretty complex routes.
Olaf
2013-11-14 14:36:31 UTC
Permalink
There is also onComplete() but it is triggered right after aggregator
processed last message, not after the route has completed the last message.
Isn't it a bug?

Anyway it wouldn't have the counter in question, I'd only know route has
finished


@Override
public void configure() throws Exception {
from("file:input.csv")
.onCompletion()
.process(new EndProcessor())//
.to("log:sync")
.end()

.unmarshal().csv().split(body()).streaming().parallelProcessing()
.bean("myProcessor", "doWork")
.aggregate(constant("id"), new
Aggregator()).completionSize(100).completionTimeout(1000)
.to("remote")
.log("Sent ${header.RecordsCounter} records to remote");
}

Continue reading on narkive:
Loading...