[
https://issues.apache.org/jira/browse/CAMEL-3535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12981462#action_12981462
]
Brian Feaver commented on CAMEL-3535:
-------------------------------------
Even in your output the line I'm expecting is missing. Hope this clears up what
I'm seeing. Here's the output when it works
{noformat}
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE
org.apache.camel.processor.aggregate.AggregateProcessor - Completion interval
triggered for correlation key: true
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] DEBUG
org.apache.camel.processor.aggregate.AggregateProcessor - Aggregation complete
for correlation key true sending aggregated exchange: Exchange[Message: AB]
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] DEBUG
org.apache.camel.processor.aggregate.AggregateProcessor - Processing aggregated
exchange: Exchange[Message: AB]
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE
org.apache.camel.processor.aggregate.AggregateProcessor - Aggregated exchange
onComplete: Exchange[Message: AB]
[main] INFO test.AggregationTest - Testing done: test.AggregationTest@77546dbc
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE
org.apache.camel.processor.aggregate.AggregateProcessor - Processing aggregated
exchange: Exchange[Message: AB] complete.
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE
org.apache.camel.processor.aggregate.AggregateProcessor - Completion interval
task complete
{noformat}
Here's the output when it doesn't
{noformat}
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE
org.apache.camel.processor.aggregate.AggregateProcessor - Completion interval
triggered for correlation key: true
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] DEBUG
org.apache.camel.processor.aggregate.AggregateProcessor - Aggregation complete
for correlation key true sending aggregated exchange: Exchange[Message: AB]
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] DEBUG
org.apache.camel.processor.aggregate.AggregateProcessor - Processing aggregated
exchange: Exchange[Message: AB]
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE
org.apache.camel.processor.aggregate.AggregateProcessor - Processing aggregated
exchange: Exchange[Message: AB] complete.
[main] INFO test.AggregationTest - Testing done: test.AggregationTest@614a75bb
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE
org.apache.camel.processor.aggregate.AggregateProcessor - Completion interval
task complete
{noformat}
The difference between the two is the following line:
{noformat}
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE
org.apache.camel.processor.aggregate.AggregateProcessor - Aggregated exchange
onComplete: Exchange[Message: AB]
{noformat}
This would be written to the log in
AggregateProcesor$AggregateOnCompletion.onComplete() on or around line 544 in
AggregateProcessor.
{code} public void onComplete(Exchange exchange) {
if (LOG.isTraceEnabled()) {
LOG.trace("Aggregated exchange onComplete: " + exchange);
}
// only confirm if we processed without a problem
try {
aggregationRepository.confirm(exchange.getContext(),
exchangeId);
// and remove redelivery state as well
redeliveryState.remove(exchangeId);
} finally {
// must remember to remove in progress when we are complete
inProgressCompleteExchanges.remove(exchangeId);
}
}
{code}
The lack of the inProgressCompleteExchanges.remove(exchangeId); call is
precisely what's causing inProgressCompleteExchanges to grow larger and not get
cleaned up.
> Aggregation fails to call onComplete for exchanges if the aggregation is
> after a bean or process.
> -------------------------------------------------------------------------------------------------
>
> Key: CAMEL-3535
> URL: https://issues.apache.org/jira/browse/CAMEL-3535
> Project: Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 2.5.0
> Reporter: Brian Feaver
> Assignee: Claus Ibsen
> Fix For: 2.6.0
>
> Attachments: AggregationTest.java
>
>
> When creating a route that contains an aggregation, if that aggregation is
> preceded by a bean or process, it will fail to call
> AggregateOnCompletion.onComplete(). I've attached a unit test that can show
> you the behavior. Trace level loggging will need to be enabled to see the
> difference. With the call to the bean, it won't show the following log entry:
> {noformat}TRACE org.apache.camel.processor.aggregate.AggregateProcessor -
> Aggregated exchange onComplete: Exchange[Message: ab]{noformat}
> If you remove the bean call, it'll start calling onComplete() again.
> What I've noticed is that if this call is not made, it ends up in a memory
> leak since the inProgressCompleteExchanges HashSet in AggregateProcessor
> never has any exchange ID's removed.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.