This appears to be a bug. The attached test case is failing at line #95 The issue appears to be around the following line of code:
onCompletion(batchKey, originalExchange, batchAnswer, false, aggregateFailed); https://github.com/apache/camel/blob/cd5c790e18f00288d1ac62aca909efb99a7a4846/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java#L633 This is called for all the correlation keys with the same originalExchange. The method onCompletion sets the property CamelAggregatedCorrelationKey on originalMessage. So originalMessage is left with the last update for this key. orginalMessage is (appears to be) the stored aggregate for the current message that triggered completion. It does not appear to have a reason to be associated with other keys. On Monday, 20 November, 2023 at 11:01:15 am GST, Dinu Pavithran <din...@yahoo.com.invalid> wrote: I am seeing wrong correlation key is set for the first exchange emitted from aggregate EIP. In the log below, Exchange[7F40FFF506AA26C-000000000000000D] is aggregating with correlation key: batch-1, but when this exchange is emitted, the property CamelAggregatedCorrelationKey is set as batch-4 I saw the same result with Camel v 4.2.0 run using camel-jbang and Camel v 3.18.4 using camel-main started from Groovy or Java I also saw the same result when using different aggregation strategies. Is there something wrong with the way I have configured the EIP? --- zip-files-2.yaml --- - route: id: zip-files from: uri: file:{{indir}} parameters: sort-by: reverse:file:modified #last modified will be the first pre-sort: true recursive: true max-depth: 3 min-depth: 3 antInclude: batches/** antExclude: '*/processing-*/*' include-ext: csv repeatCount: 1 synchronous: true steps: - setHeader: name: Batch simple: ${header.CamelFileParent.replaceFirst('{{inDir}}/batches/','')} - aggregate: correlation-expression: header: Batch aggregation-strategy: '#class:org.apache.camel.processor.aggregate.zipfile.ZipAggregationStrategy' completion-from-batch-consumer: true eager-check-completion: true #simplified - setHeader: name: LatestBatch constant: batch-4 - setHeader: name: RunID constant: 20231120063433118 - setHeader: name: Batch exchangeProperty: CamelAggregatedCorrelationKey - log: loggingLevel: debug message: Processing batch ${headers.Batch} from exchange ${exchangeId} - filter: simple: ${headers.LatestBatch} == ${headers.Batch} steps: # for tracing - to: file:{{indir}}/{{sentDir}}?file-name=Processing-${header.RunID}-${header.Batch}.zip - log: Wrote ${headers.Batch} to ${header.CamelFileNameProduced} - log: Send ${headers.Batch} --- tree {{indir}}/batches -- in ├── batches │ ├── batch-1 │ │ ├── events.csv │ │ └── users.csv │ ├── batch-2 │ │ ├── events.csv │ │ └── users.csv │ ├── batch-3 │ │ ├── events.csv │ │ └── users.csv │ ├── batch-4 │ │ ├── events.csv │ │ └── users.csv │ └── processing-batch-5 │ ├── events.csv │ └── users.csv -- log --- 06:34:29.845 INFO [ main] mel.main.MainSupport : Apache Camel (JBang) 4.2.0 is starting 06:34:30.061 INFO [ main] mel.main.MainSupport : Using Java 17.0.8.1 with PID 481790. Started by vscode in /home/vscode/camel 06:34:30.956 INFO [ main] main.BaseMainSupport : Auto-configuration summary 06:34:30.956 INFO [ main] main.BaseMainSupport : [application.properties] camel.main.durationMaxIdleSeconds=1 06:34:30.956 INFO [ main] main.BaseMainSupport : [application.properties] camel.main.shutdownTimeout=5 06:34:30.957 INFO [ main] main.BaseMainSupport : [application.properties] camel.server.enabled=false 06:34:30.957 INFO [ main] main.BaseMainSupport : [application.properties] camel.server.healthCheckEnabled=true 06:34:30.957 INFO [ main] main.BaseMainSupport : [application.properties] camel.server.devConsoleEnabled=true 06:34:30.958 INFO [ main] main.BaseMainSupport : [application.properties] camel.health.enabled=false 06:34:30.958 INFO [ main] main.BaseMainSupport : [application.properties] camel.health.exposureLevel=full 06:34:31.364 INFO [ main] or.LocalCliConnector : Camel CLI enabled (local) 06:34:31.744 INFO [ main] AbstractCamelContext : Apache Camel 4.2.0 (zip-files-2) is starting 06:34:31.915 INFO [ main] e.AggregateProcessor : Defaulting to MemoryAggregationRepository 06:34:32.058 INFO [ main] main.BaseMainSupport : Property-placeholders summary 06:34:32.058 INFO [ main] main.BaseMainSupport : [application.properties] indir=in 06:34:32.059 INFO [ main] main.BaseMainSupport : [application.properties] batchDir=in/batches 06:34:32.059 INFO [ main] main.BaseMainSupport : [application.properties] sentDir=send 06:34:32.060 INFO [ main] main.BaseMainSupport : [application.properties] skippedDir=skipped 06:34:32.096 INFO [ main] AbstractCamelContext : Routes startup (started:1) 06:34:32.097 INFO [ main] AbstractCamelContext : Started zip-files (file://in) 06:34:32.097 INFO [ main] AbstractCamelContext : Apache Camel 4.2.0 (zip-files-2) started in 352ms (build:0ms init:0ms start:352ms) 06:34:32.100 INFO [ main] mel.main.MainSupport : Waiting until complete: Duration idle 1 seconds 06:34:33.140 DEBUG [ file://in] zip-files-2.yaml:62 : File batches/batch-4/events.csv is for batch batch-4 06:34:33.157 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start +++ with correlation key: batch-4 06:34:33.185 TRACE [ file://in] e.AggregateProcessor : In progress aggregated oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-0000000000000001] with correlation key: batch-4 06:34:33.186 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end +++ with correlation key: batch-4 06:34:33.205 DEBUG [ file://in] zip-files-2.yaml:62 : File batches/batch-4/users.csv is for batch batch-4 06:34:33.214 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start +++ with correlation key: batch-4 06:34:33.217 TRACE [ file://in] e.AggregateProcessor : In progress aggregated oldExchange: Exchange[7F40FFF506AA26C-0000000000000001], newExchange: Exchange[7F40FFF506AA26C-0000000000000001] with correlation key: batch-4 06:34:33.217 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end +++ with correlation key: batch-4 06:34:33.232 DEBUG [ file://in] zip-files-2.yaml:62 : File batches/batch-3/users.csv is for batch batch-3 06:34:33.238 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start +++ with correlation key: batch-3 06:34:33.242 TRACE [ file://in] e.AggregateProcessor : In progress aggregated oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-0000000000000005] with correlation key: batch-3 06:34:33.242 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end +++ with correlation key: batch-3 06:34:33.261 DEBUG [ file://in] zip-files-2.yaml:62 : File batches/batch-3/events.csv is for batch batch-3 06:34:33.273 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start +++ with correlation key: batch-3 06:34:33.276 TRACE [ file://in] e.AggregateProcessor : In progress aggregated oldExchange: Exchange[7F40FFF506AA26C-0000000000000005], newExchange: Exchange[7F40FFF506AA26C-0000000000000005] with correlation key: batch-3 06:34:33.277 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end +++ with correlation key: batch-3 06:34:33.291 DEBUG [ file://in] zip-files-2.yaml:62 : File batches/batch-2/events.csv is for batch batch-2 06:34:33.296 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start +++ with correlation key: batch-2 06:34:33.299 TRACE [ file://in] e.AggregateProcessor : In progress aggregated oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-0000000000000009] with correlation key: batch-2 06:34:33.299 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end +++ with correlation key: batch-2 06:34:33.312 DEBUG [ file://in] zip-files-2.yaml:62 : File batches/batch-2/users.csv is for batch batch-2 06:34:33.317 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start +++ with correlation key: batch-2 06:34:33.320 TRACE [ file://in] e.AggregateProcessor : In progress aggregated oldExchange: Exchange[7F40FFF506AA26C-0000000000000009], newExchange: Exchange[7F40FFF506AA26C-0000000000000009] with correlation key: batch-2 06:34:33.320 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end +++ with correlation key: batch-2 06:34:33.333 DEBUG [ file://in] zip-files-2.yaml:62 : File batches/batch-1/users.csv is for batch batch-1 06:34:33.340 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start +++ with correlation key: batch-1 06:34:33.343 TRACE [ file://in] e.AggregateProcessor : In progress aggregated oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-000000000000000D] with correlation key: batch-1 06:34:33.343 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end +++ with correlation key: batch-1 06:34:33.357 DEBUG [ file://in] zip-files-2.yaml:62 : File batches/batch-1/events.csv is for batch batch-1 06:34:33.364 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start +++ with correlation key: batch-1 06:34:33.367 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end +++ with correlation key: batch-1 06:34:33.367 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for correlation key batch-1 sending aggregated exchange: Exchange[7F40FFF506AA26C-000000000000000D] 06:34:33.367 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated exchange: Exchange[7F40FFF506AA26C-000000000000000D] 06:34:33.370 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for correlation key batch-1 sending aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000009] 06:34:33.370 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000009] 06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for correlation key batch-1 sending aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000005] 06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000005] 06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for correlation key batch-1 sending aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000001] 06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000001] 06:34:33.392 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-4 from exchange 7F40FFF506AA26C-000000000000000D 06:34:33.411 INFO [Aggregator] zip-files-2.yaml:132 : Wrote batch-4 to in/send/Processing-20231120063433118-batch-4.zip 06:34:33.413 INFO [Aggregator] zip-files-2.yaml:135 : Send batch-4 06:34:33.414 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange onComplete: Exchange[7F40FFF506AA26C-000000000000000D] 06:34:33.415 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated exchange: Exchange[7F40FFF506AA26C-000000000000000D] complete. 06:34:33.424 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-2 from exchange 7F40FFF506AA26C-0000000000000009 06:34:33.435 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange onComplete: Exchange[7F40FFF506AA26C-0000000000000009] 06:34:33.435 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000009] complete. 06:34:33.445 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-3 from exchange 7F40FFF506AA26C-0000000000000005 06:34:33.460 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange onComplete: Exchange[7F40FFF506AA26C-0000000000000005] 06:34:33.460 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000005] complete. 06:34:33.466 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-4 from exchange 7F40FFF506AA26C-0000000000000001 06:34:33.483 INFO [Aggregator] zip-files-2.yaml:132 : Wrote batch-4 to in/send/Processing-20231120063433118-batch-4.zip 06:34:33.485 INFO [Aggregator] zip-files-2.yaml:135 : Send batch-4 06:34:33.486 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange onComplete: Exchange[7F40FFF506AA26C-0000000000000001] 06:34:33.486 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000001] complete. 06:34:34.962 INFO [2-thread-1] ainLifecycleStrategy : Duration max idle triggering shutdown of the JVM 06:34:34.965 INFO [melContext] AbstractCamelContext : Apache Camel 4.2.0 (zip-files-2) is shutting down (timeout:5s0ms) 06:34:34.982 INFO [melContext] AbstractCamelContext : Routes stopped (stopped:1) 06:34:34.982 INFO [melContext] AbstractCamelContext : Stopped zip-files (file://in) 06:34:34.994 INFO [melContext] AbstractCamelContext : Apache Camel 4.2.0 (zip-files-2) shutdown in 28ms (uptime:3s) 06:34:34.995 INFO [ main] mel.main.MainSupport : Apache Camel (JBang) 4.2.0 shutdown