I am seeing wrong correlation key is set for the first exchange emitted from 
aggregate EIP. 

In the log below, 
Exchange[7F40FFF506AA26C-000000000000000D] is aggregating with correlation key: 
batch-1, but when this exchange is emitted, the property 
CamelAggregatedCorrelationKey is set as batch-4

I saw the same result with Camel v 4.2.0 run using camel-jbang and Camel v 
3.18.4 using camel-main started from Groovy or Java
I also saw the same result when using different aggregation strategies.

Is there something wrong with the way I have configured the EIP?


--- zip-files-2.yaml ---

- route:
    id: zip-files
    from:
      uri: file:{{indir}}
      parameters:
        sort-by: reverse:file:modified #last modified will be the first 
        pre-sort: true
        recursive: true
        max-depth: 3
        min-depth: 3
        antInclude: batches/**
        antExclude: '*/processing-*/*'
        include-ext: csv
        repeatCount: 1
        synchronous: true      
      steps:
      - setHeader:
          name: Batch
          simple: 
${header.CamelFileParent.replaceFirst('{{inDir}}/batches/','')}

      - aggregate: 
          correlation-expression: 
            header: Batch
          aggregation-strategy: 
'#class:org.apache.camel.processor.aggregate.zipfile.ZipAggregationStrategy'
          completion-from-batch-consumer: true
          eager-check-completion: true

      #simplified
      - setHeader:   
          name: LatestBatch
          constant: batch-4
      - setHeader:   
          name: RunID
          constant: 20231120063433118 


      - setHeader:
          name: Batch
          exchangeProperty: CamelAggregatedCorrelationKey

      - log: 
          loggingLevel: debug
          message: Processing batch ${headers.Batch} from exchange ${exchangeId}

      - filter:
          simple: ${headers.LatestBatch} == ${headers.Batch}
          steps:
            # for tracing
            - to: 
file:{{indir}}/{{sentDir}}?file-name=Processing-${header.RunID}-${header.Batch}.zip
            - log: Wrote ${headers.Batch} to ${header.CamelFileNameProduced}
        
            - log: Send ${headers.Batch}

--- tree {{indir}}/batches -- 
in
├── batches
│   ├── batch-1
│   │   ├── events.csv
│   │   └── users.csv
│   ├── batch-2
│   │   ├── events.csv
│   │   └── users.csv
│   ├── batch-3
│   │   ├── events.csv
│   │   └── users.csv
│   ├── batch-4
│   │   ├── events.csv
│   │   └── users.csv
│   └── processing-batch-5
│       ├── events.csv
│       └── users.csv

-- log ---
06:34:29.845  INFO [      main] mel.main.MainSupport : Apache Camel (JBang) 
4.2.0 is starting
06:34:30.061  INFO [      main] mel.main.MainSupport : Using Java 17.0.8.1 with 
PID 481790. Started by vscode in /home/vscode/camel
06:34:30.956  INFO [      main] main.BaseMainSupport : Auto-configuration 
summary
06:34:30.956  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.main.durationMaxIdleSeconds=1
06:34:30.956  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.main.shutdownTimeout=5
06:34:30.957  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.server.enabled=false
06:34:30.957  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.server.healthCheckEnabled=true
06:34:30.957  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.server.devConsoleEnabled=true
06:34:30.958  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.health.enabled=false
06:34:30.958  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.health.exposureLevel=full
06:34:31.364  INFO [      main] or.LocalCliConnector : Camel CLI enabled (local)
06:34:31.744  INFO [      main] AbstractCamelContext : Apache Camel 4.2.0 
(zip-files-2) is starting
06:34:31.915  INFO [      main] e.AggregateProcessor : Defaulting to 
MemoryAggregationRepository
06:34:32.058  INFO [      main] main.BaseMainSupport : Property-placeholders 
summary
06:34:32.058  INFO [      main] main.BaseMainSupport :     
[application.properties]       indir=in
06:34:32.059  INFO [      main] main.BaseMainSupport :     
[application.properties]       batchDir=in/batches
06:34:32.059  INFO [      main] main.BaseMainSupport :     
[application.properties]       sentDir=send
06:34:32.060  INFO [      main] main.BaseMainSupport :     
[application.properties]       skippedDir=skipped
06:34:32.096  INFO [      main] AbstractCamelContext : Routes startup 
(started:1)
06:34:32.097  INFO [      main] AbstractCamelContext :     Started zip-files 
(file://in)
06:34:32.097  INFO [      main] AbstractCamelContext : Apache Camel 4.2.0 
(zip-files-2) started in 352ms (build:0ms init:0ms start:352ms)
06:34:32.100  INFO [      main] mel.main.MainSupport : Waiting until complete: 
Duration idle 1 seconds
06:34:33.140 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-4/events.csv is for batch  batch-4
06:34:33.157 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-4
06:34:33.185 TRACE [ file://in] e.AggregateProcessor : In progress aggregated 
oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-0000000000000001] with 
correlation key: batch-4
06:34:33.186 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-4
06:34:33.205 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-4/users.csv is for batch  batch-4
06:34:33.214 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-4
06:34:33.217 TRACE [ file://in] e.AggregateProcessor : In progress aggregated 
oldExchange: Exchange[7F40FFF506AA26C-0000000000000001], newExchange: 
Exchange[7F40FFF506AA26C-0000000000000001] with correlation key: batch-4
06:34:33.217 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-4
06:34:33.232 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-3/users.csv is for batch  batch-3
06:34:33.238 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-3
06:34:33.242 TRACE [ file://in] e.AggregateProcessor : In progress aggregated 
oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-0000000000000005] with 
correlation key: batch-3
06:34:33.242 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-3
06:34:33.261 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-3/events.csv is for batch  batch-3
06:34:33.273 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-3
06:34:33.276 TRACE [ file://in] e.AggregateProcessor : In progress aggregated 
oldExchange: Exchange[7F40FFF506AA26C-0000000000000005], newExchange: 
Exchange[7F40FFF506AA26C-0000000000000005] with correlation key: batch-3
06:34:33.277 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-3
06:34:33.291 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-2/events.csv is for batch  batch-2
06:34:33.296 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-2
06:34:33.299 TRACE [ file://in] e.AggregateProcessor : In progress aggregated 
oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-0000000000000009] with 
correlation key: batch-2
06:34:33.299 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-2
06:34:33.312 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-2/users.csv is for batch  batch-2
06:34:33.317 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-2
06:34:33.320 TRACE [ file://in] e.AggregateProcessor : In progress aggregated 
oldExchange: Exchange[7F40FFF506AA26C-0000000000000009], newExchange: 
Exchange[7F40FFF506AA26C-0000000000000009] with correlation key: batch-2
06:34:33.320 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-2
06:34:33.333 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-1/users.csv is for batch  batch-1
06:34:33.340 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-1
06:34:33.343 TRACE [ file://in] e.AggregateProcessor : In progress aggregated 
oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-000000000000000D] with 
correlation key: batch-1
06:34:33.343 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-1
06:34:33.357 DEBUG [ file://in]  zip-files-2.yaml:62 : File 
batches/batch-1/events.csv is for batch  batch-1
06:34:33.364 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start 
+++ with correlation key: batch-1
06:34:33.367 TRACE [ file://in] e.AggregateProcessor : onAggregation +++  end  
+++ with correlation key: batch-1
06:34:33.367 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for 
correlation key batch-1 sending aggregated exchange: 
Exchange[7F40FFF506AA26C-000000000000000D]
06:34:33.367 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-000000000000000D]
06:34:33.370 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for 
correlation key batch-1 sending aggregated exchange: 
Exchange[7F40FFF506AA26C-0000000000000009]
06:34:33.370 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-0000000000000009]
06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for 
correlation key batch-1 sending aggregated exchange: 
Exchange[7F40FFF506AA26C-0000000000000005]
06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-0000000000000005]
06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for 
correlation key batch-1 sending aggregated exchange: 
Exchange[7F40FFF506AA26C-0000000000000001]
06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-0000000000000001]
06:34:33.392 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-4 
from exchange 7F40FFF506AA26C-000000000000000D
06:34:33.411  INFO [Aggregator] zip-files-2.yaml:132 : Wrote batch-4 to 
in/send/Processing-20231120063433118-batch-4.zip
06:34:33.413  INFO [Aggregator] zip-files-2.yaml:135 : Send batch-4
06:34:33.414 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange 
onComplete: Exchange[7F40FFF506AA26C-000000000000000D]
06:34:33.415 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-000000000000000D] complete.
06:34:33.424 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-2 
from exchange 7F40FFF506AA26C-0000000000000009
06:34:33.435 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange 
onComplete: Exchange[7F40FFF506AA26C-0000000000000009]
06:34:33.435 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-0000000000000009] complete.
06:34:33.445 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-3 
from exchange 7F40FFF506AA26C-0000000000000005
06:34:33.460 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange 
onComplete: Exchange[7F40FFF506AA26C-0000000000000005]
06:34:33.460 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-0000000000000005] complete.
06:34:33.466 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-4 
from exchange 7F40FFF506AA26C-0000000000000001
06:34:33.483  INFO [Aggregator] zip-files-2.yaml:132 : Wrote batch-4 to 
in/send/Processing-20231120063433118-batch-4.zip
06:34:33.485  INFO [Aggregator] zip-files-2.yaml:135 : Send batch-4
06:34:33.486 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange 
onComplete: Exchange[7F40FFF506AA26C-0000000000000001]
06:34:33.486 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated 
exchange: Exchange[7F40FFF506AA26C-0000000000000001] complete.
06:34:34.962  INFO [2-thread-1] ainLifecycleStrategy : Duration max idle 
triggering shutdown of the JVM
06:34:34.965  INFO [melContext] AbstractCamelContext : Apache Camel 4.2.0 
(zip-files-2) is shutting down (timeout:5s0ms)
06:34:34.982  INFO [melContext] AbstractCamelContext : Routes stopped 
(stopped:1)
06:34:34.982  INFO [melContext] AbstractCamelContext :     Stopped zip-files 
(file://in)
06:34:34.994  INFO [melContext] AbstractCamelContext : Apache Camel 4.2.0 
(zip-files-2) shutdown in 28ms (uptime:3s)
06:34:34.995  INFO [      main] mel.main.MainSupport : Apache Camel (JBang) 
4.2.0 shutdown

Reply via email to