This appears to be a bug. The attached test case is failing at line #95

The issue appears to be around the following line of code:

onCompletion(batchKey, originalExchange, batchAnswer, false, aggregateFailed);

https://github.com/apache/camel/blob/cd5c790e18f00288d1ac62aca909efb99a7a4846/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java#L633

This is called for all the correlation keys with the same originalExchange. The 
method onCompletion sets the property CamelAggregatedCorrelationKey on 
originalMessage. So originalMessage  is left with the last update for this key. 
orginalMessage is (appears to be) the stored aggregate for the current message 
that triggered completion. It does not appear to have a reason to be associated 
with other keys. 



On Monday, 20 November, 2023 at 11:01:15 am GST, Dinu Pavithran 
<din...@yahoo.com.invalid> wrote: 





I am seeing wrong correlation key is set for the first exchange emitted from 
aggregate EIP. 

In the log below, 
Exchange[7F40FFF506AA26C-000000000000000D] is aggregating with correlation key: 
batch-1, but when this exchange is emitted, the property 
CamelAggregatedCorrelationKey is set as batch-4

I saw the same result with Camel v 4.2.0 run using camel-jbang and Camel v 
3.18.4 using camel-main started from Groovy or Java
I also saw the same result when using different aggregation strategies.

Is there something wrong with the way I have configured the EIP?


--- zip-files-2.yaml ---

- route:
    id: zip-files
    from:
      uri: file:{{indir}}
      parameters:
        sort-by: reverse:file:modified #last modified will be the first 
        pre-sort: true
        recursive: true
        max-depth: 3
        min-depth: 3
        antInclude: batches/**
        antExclude: '*/processing-*/*'
        include-ext: csv
        repeatCount: 1
        synchronous: true      
      steps:
      - setHeader:
          name: Batch
          simple: 
${header.CamelFileParent.replaceFirst('{{inDir}}/batches/','')}

      - aggregate: 
          correlation-expression: 
            header: Batch
          aggregation-strategy: 
'#class:org.apache.camel.processor.aggregate.zipfile.ZipAggregationStrategy'
          completion-from-batch-consumer: true
          eager-check-completion: true

      #simplified
      - setHeader:   
          name: LatestBatch
          constant: batch-4
      - setHeader:   
          name: RunID
          constant: 20231120063433118 


      - setHeader:
          name: Batch
          exchangeProperty: CamelAggregatedCorrelationKey

      - log: 
          loggingLevel: debug
          message: Processing batch ${headers.Batch} from exchange ${exchangeId}

      - filter:
          simple: ${headers.LatestBatch} == ${headers.Batch}
          steps:
            # for tracing
            - to: 
file:{{indir}}/{{sentDir}}?file-name=Processing-${header.RunID}-${header.Batch}.zip
            - log: Wrote ${headers.Batch} to ${header.CamelFileNameProduced}
        
            - log: Send ${headers.Batch}

--- tree {{indir}}/batches -- 
in
├── batches
│   ├── batch-1
│   │   ├── events.csv
│   │   └── users.csv
│   ├── batch-2
│   │   ├── events.csv
│   │   └── users.csv
│   ├── batch-3
│   │   ├── events.csv
│   │   └── users.csv
│   ├── batch-4
│   │   ├── events.csv
│   │   └── users.csv
│   └── processing-batch-5
│       ├── events.csv
│       └── users.csv

-- log ---
06:34:29.845  INFO [      main] mel.main.MainSupport : Apache Camel (JBang) 
4.2.0 is starting
06:34:30.061  INFO [      main] mel.main.MainSupport : Using Java 17.0.8.1 with 
PID 481790. Started by vscode in /home/vscode/camel
06:34:30.956  INFO [      main] main.BaseMainSupport : Auto-configuration 
summary
06:34:30.956  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.main.durationMaxIdleSeconds=1
06:34:30.956  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.main.shutdownTimeout=5
06:34:30.957  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.server.enabled=false
06:34:30.957  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.server.healthCheckEnabled=true
06:34:30.957  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.server.devConsoleEnabled=true
06:34:30.958  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.health.enabled=false
06:34:30.958  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.health.exposureLevel=full
06:34:31.364  INFO [      main] or.LocalCliConnector : Camel CLI enabled (local)
06:34:31.744  INFO [      main] AbstractCamelContext : Apache Camel 4.2.0 
(zip-files-2) is starting
06:34:31.915  INFO [      main] e.AggregateProcessor : Defaulting to 
MemoryAggregationRepository
06:34:32.058  INFO [      main] main.BaseMainSupport : Property-placeholders 
summary
06:34:32.058  INFO [      main] main.BaseMainSupport :     
[application.properties]       indir=in
06:34:32.059  INFO [      main] main.BaseMainSupport :     
[application.properties]       batchDir=in/batches
06:34:32.059  INFO [      main] main.BaseMainSupport :     
[application.properties]       sentDir=send
06:34:32.060  INFO [      main] main.BaseMainSupport :     
[application.properties]       skippedDir=skipped
06:34:32.096  INFO [      main] AbstractCamelContext : Routes startup 
(started:1)
06:34:32.097  INFO [      main] AbstractCamelContext :     Started zip-files 
(file://in)
06:34:32.097  INFO [      main] AbstractCamelContext : Apache Camel 4.2.0 
(zip-files-2) started in 352ms (build:0ms init:0ms start:352ms)
06:34:32.100  INFO [      main] mel.main.MainSupport : Waiting until complete: 
Duration idle 1 seconds
06:34:33.140 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-4/events.csv is for batch  batch-4
06:34:33.157 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-4
06:34:33.185 TRACE [ file://in] e.AggregateProcessor : In progress aggregated 
oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-0000000000000001] with 
correlation key: batch-4
06:34:33.186 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-4
06:34:33.205 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-4/users.csv is for batch  batch-4
06:34:33.214 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-4
06:34:33.217 TRACE [ file://in] e.AggregateProcessor : In progress aggregated 
oldExchange: Exchange[7F40FFF506AA26C-0000000000000001], newExchange: 
Exchange[7F40FFF506AA26C-0000000000000001] with correlation key: batch-4
06:34:33.217 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-4
06:34:33.232 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-3/users.csv is for batch  batch-3
06:34:33.238 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-3
06:34:33.242 TRACE [ file://in] e.AggregateProcessor : In progress aggregated 
oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-0000000000000005] with 
correlation key: batch-3
06:34:33.242 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-3
06:34:33.261 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-3/events.csv is for batch  batch-3
06:34:33.273 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-3
06:34:33.276 TRACE [ file://in] e.AggregateProcessor : In progress aggregated 
oldExchange: Exchange[7F40FFF506AA26C-0000000000000005], newExchange: 
Exchange[7F40FFF506AA26C-0000000000000005] with correlation key: batch-3
06:34:33.277 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-3
06:34:33.291 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-2/events.csv is for batch  batch-2
06:34:33.296 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-2
06:34:33.299 TRACE [ file://in] e.AggregateProcessor : In progress aggregated 
oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-0000000000000009] with 
correlation key: batch-2
06:34:33.299 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-2
06:34:33.312 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-2/users.csv is for batch  batch-2
06:34:33.317 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-2
06:34:33.320 TRACE [ file://in] e.AggregateProcessor : In progress aggregated 
oldExchange: Exchange[7F40FFF506AA26C-0000000000000009], newExchange: 
Exchange[7F40FFF506AA26C-0000000000000009] with correlation key: batch-2
06:34:33.320 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-2
06:34:33.333 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-1/users.csv is for batch  batch-1
06:34:33.340 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-1
06:34:33.343 TRACE [ file://in] e.AggregateProcessor : In progress aggregated 
oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-000000000000000D] with 
correlation key: batch-1
06:34:33.343 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-1
06:34:33.357 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-1/events.csv is for batch  batch-1
06:34:33.364 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-1
06:34:33.367 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-1
06:34:33.367 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for 
correlation key batch-1 sending aggregated exchange: 
Exchange[7F40FFF506AA26C-000000000000000D]
06:34:33.367 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-000000000000000D]
06:34:33.370 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for 
correlation key batch-1 sending aggregated exchange: 
Exchange[7F40FFF506AA26C-0000000000000009]
06:34:33.370 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-0000000000000009]
06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for 
correlation key batch-1 sending aggregated exchange: 
Exchange[7F40FFF506AA26C-0000000000000005]
06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-0000000000000005]
06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for 
correlation key batch-1 sending aggregated exchange: 
Exchange[7F40FFF506AA26C-0000000000000001]
06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-0000000000000001]
06:34:33.392 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-4 
from exchange 7F40FFF506AA26C-000000000000000D
06:34:33.411  INFO [Aggregator] zip-files-2.yaml:132 : Wrote batch-4 to 
in/send/Processing-20231120063433118-batch-4.zip
06:34:33.413  INFO [Aggregator] zip-files-2.yaml:135 : Send batch-4
06:34:33.414 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange 
onComplete: Exchange[7F40FFF506AA26C-000000000000000D]
06:34:33.415 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-000000000000000D] complete.
06:34:33.424 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-2 
from exchange 7F40FFF506AA26C-0000000000000009
06:34:33.435 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange 
onComplete: Exchange[7F40FFF506AA26C-0000000000000009]
06:34:33.435 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-0000000000000009] complete.
06:34:33.445 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-3 
from exchange 7F40FFF506AA26C-0000000000000005
06:34:33.460 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange 
onComplete: Exchange[7F40FFF506AA26C-0000000000000005]
06:34:33.460 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-0000000000000005] complete.
06:34:33.466 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-4 
from exchange 7F40FFF506AA26C-0000000000000001
06:34:33.483  INFO [Aggregator] zip-files-2.yaml:132 : Wrote batch-4 to 
in/send/Processing-20231120063433118-batch-4.zip
06:34:33.485  INFO [Aggregator] zip-files-2.yaml:135 : Send batch-4
06:34:33.486 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange 
onComplete: Exchange[7F40FFF506AA26C-0000000000000001]
06:34:33.486 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-0000000000000001] complete.
06:34:34.962  INFO [2-thread-1] ainLifecycleStrategy : Duration max idle 
triggering shutdown of the JVM
06:34:34.965  INFO [melContext] AbstractCamelContext : Apache Camel 4.2.0 
(zip-files-2) is shutting down (timeout:5s0ms)
06:34:34.982  INFO [melContext] AbstractCamelContext : Routes stopped 
(stopped:1)
06:34:34.982  INFO [melContext] AbstractCamelContext :     Stopped zip-files 
(file://in)
06:34:34.994  INFO [melContext] AbstractCamelContext : Apache Camel 4.2.0 
(zip-files-2) shutdown in 28ms (uptime:3s)
06:34:34.995  INFO [      main] mel.main.MainSupport : Apache Camel (JBang) 
4.2.0 shutdown

Reply via email to