I am seeing wrong correlation key is set for the first exchange emitted from
aggregate EIP.
In the log below,
Exchange[7F40FFF506AA26C-000000000000000D] is aggregating with correlation key:
batch-1, but when this exchange is emitted, the property
CamelAggregatedCorrelationKey is set as batch-4
I saw the same result with Camel v 4.2.0 run using camel-jbang and Camel v
3.18.4 using camel-main started from Groovy or Java
I also saw the same result when using different aggregation strategies.
Is there something wrong with the way I have configured the EIP?
--- zip-files-2.yaml ---
- route:
id: zip-files
from:
uri: file:{{indir}}
parameters:
sort-by: reverse:file:modified #last modified will be the first
pre-sort: true
recursive: true
max-depth: 3
min-depth: 3
antInclude: batches/**
antExclude: '*/processing-*/*'
include-ext: csv
repeatCount: 1
synchronous: true
steps:
- setHeader:
name: Batch
simple:
${header.CamelFileParent.replaceFirst('{{inDir}}/batches/','')}
- aggregate:
correlation-expression:
header: Batch
aggregation-strategy:
'#class:org.apache.camel.processor.aggregate.zipfile.ZipAggregationStrategy'
completion-from-batch-consumer: true
eager-check-completion: true
#simplified
- setHeader:
name: LatestBatch
constant: batch-4
- setHeader:
name: RunID
constant: 20231120063433118
- setHeader:
name: Batch
exchangeProperty: CamelAggregatedCorrelationKey
- log:
loggingLevel: debug
message: Processing batch ${headers.Batch} from exchange ${exchangeId}
- filter:
simple: ${headers.LatestBatch} == ${headers.Batch}
steps:
# for tracing
- to:
file:{{indir}}/{{sentDir}}?file-name=Processing-${header.RunID}-${header.Batch}.zip
- log: Wrote ${headers.Batch} to ${header.CamelFileNameProduced}
- log: Send ${headers.Batch}
--- tree {{indir}}/batches --
in
├── batches
│ ├── batch-1
│ │ ├── events.csv
│ │ └── users.csv
│ ├── batch-2
│ │ ├── events.csv
│ │ └── users.csv
│ ├── batch-3
│ │ ├── events.csv
│ │ └── users.csv
│ ├── batch-4
│ │ ├── events.csv
│ │ └── users.csv
│ └── processing-batch-5
│ ├── events.csv
│ └── users.csv
-- log ---
06:34:29.845 INFO [ main] mel.main.MainSupport : Apache Camel (JBang)
4.2.0 is starting
06:34:30.061 INFO [ main] mel.main.MainSupport : Using Java 17.0.8.1 with
PID 481790. Started by vscode in /home/vscode/camel
06:34:30.956 INFO [ main] main.BaseMainSupport : Auto-configuration
summary
06:34:30.956 INFO [ main] main.BaseMainSupport :
[application.properties] camel.main.durationMaxIdleSeconds=1
06:34:30.956 INFO [ main] main.BaseMainSupport :
[application.properties] camel.main.shutdownTimeout=5
06:34:30.957 INFO [ main] main.BaseMainSupport :
[application.properties] camel.server.enabled=false
06:34:30.957 INFO [ main] main.BaseMainSupport :
[application.properties] camel.server.healthCheckEnabled=true
06:34:30.957 INFO [ main] main.BaseMainSupport :
[application.properties] camel.server.devConsoleEnabled=true
06:34:30.958 INFO [ main] main.BaseMainSupport :
[application.properties] camel.health.enabled=false
06:34:30.958 INFO [ main] main.BaseMainSupport :
[application.properties] camel.health.exposureLevel=full
06:34:31.364 INFO [ main] or.LocalCliConnector : Camel CLI enabled (local)
06:34:31.744 INFO [ main] AbstractCamelContext : Apache Camel 4.2.0
(zip-files-2) is starting
06:34:31.915 INFO [ main] e.AggregateProcessor : Defaulting to
MemoryAggregationRepository
06:34:32.058 INFO [ main] main.BaseMainSupport : Property-placeholders
summary
06:34:32.058 INFO [ main] main.BaseMainSupport :
[application.properties] indir=in
06:34:32.059 INFO [ main] main.BaseMainSupport :
[application.properties] batchDir=in/batches
06:34:32.059 INFO [ main] main.BaseMainSupport :
[application.properties] sentDir=send
06:34:32.060 INFO [ main] main.BaseMainSupport :
[application.properties] skippedDir=skipped
06:34:32.096 INFO [ main] AbstractCamelContext : Routes startup
(started:1)
06:34:32.097 INFO [ main] AbstractCamelContext : Started zip-files
(file://in)
06:34:32.097 INFO [ main] AbstractCamelContext : Apache Camel 4.2.0
(zip-files-2) started in 352ms (build:0ms init:0ms start:352ms)
06:34:32.100 INFO [ main] mel.main.MainSupport : Waiting until complete:
Duration idle 1 seconds
06:34:33.140 DEBUG [ file://in] zip-files-2.yaml:62 : File
batches/batch-4/events.csv is for batch batch-4
06:34:33.157 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start
+++ with correlation key: batch-4
06:34:33.185 TRACE [ file://in] e.AggregateProcessor : In progress aggregated
oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-0000000000000001] with
correlation key: batch-4
06:34:33.186 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end
+++ with correlation key: batch-4
06:34:33.205 DEBUG [ file://in] zip-files-2.yaml:62 : File
batches/batch-4/users.csv is for batch batch-4
06:34:33.214 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start
+++ with correlation key: batch-4
06:34:33.217 TRACE [ file://in] e.AggregateProcessor : In progress aggregated
oldExchange: Exchange[7F40FFF506AA26C-0000000000000001], newExchange:
Exchange[7F40FFF506AA26C-0000000000000001] with correlation key: batch-4
06:34:33.217 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end
+++ with correlation key: batch-4
06:34:33.232 DEBUG [ file://in] zip-files-2.yaml:62 : File
batches/batch-3/users.csv is for batch batch-3
06:34:33.238 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start
+++ with correlation key: batch-3
06:34:33.242 TRACE [ file://in] e.AggregateProcessor : In progress aggregated
oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-0000000000000005] with
correlation key: batch-3
06:34:33.242 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end
+++ with correlation key: batch-3
06:34:33.261 DEBUG [ file://in] zip-files-2.yaml:62 : File
batches/batch-3/events.csv is for batch batch-3
06:34:33.273 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start
+++ with correlation key: batch-3
06:34:33.276 TRACE [ file://in] e.AggregateProcessor : In progress aggregated
oldExchange: Exchange[7F40FFF506AA26C-0000000000000005], newExchange:
Exchange[7F40FFF506AA26C-0000000000000005] with correlation key: batch-3
06:34:33.277 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end
+++ with correlation key: batch-3
06:34:33.291 DEBUG [ file://in] zip-files-2.yaml:62 : File
batches/batch-2/events.csv is for batch batch-2
06:34:33.296 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start
+++ with correlation key: batch-2
06:34:33.299 TRACE [ file://in] e.AggregateProcessor : In progress aggregated
oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-0000000000000009] with
correlation key: batch-2
06:34:33.299 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end
+++ with correlation key: batch-2
06:34:33.312 DEBUG [ file://in] zip-files-2.yaml:62 : File
batches/batch-2/users.csv is for batch batch-2
06:34:33.317 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start
+++ with correlation key: batch-2
06:34:33.320 TRACE [ file://in] e.AggregateProcessor : In progress aggregated
oldExchange: Exchange[7F40FFF506AA26C-0000000000000009], newExchange:
Exchange[7F40FFF506AA26C-0000000000000009] with correlation key: batch-2
06:34:33.320 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end
+++ with correlation key: batch-2
06:34:33.333 DEBUG [ file://in] zip-files-2.yaml:62 : File
batches/batch-1/users.csv is for batch batch-1
06:34:33.340 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start
+++ with correlation key: batch-1
06:34:33.343 TRACE [ file://in] e.AggregateProcessor : In progress aggregated
oldExchange: null, newExchange: Exchange[7F40FFF506AA26C-000000000000000D] with
correlation key: batch-1
06:34:33.343 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end
+++ with correlation key: batch-1
06:34:33.357 DEBUG [ file://in] zip-files-2.yaml:62 : File
batches/batch-1/events.csv is for batch batch-1
06:34:33.364 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ start
+++ with correlation key: batch-1
06:34:33.367 TRACE [ file://in] e.AggregateProcessor : onAggregation +++ end
+++ with correlation key: batch-1
06:34:33.367 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for
correlation key batch-1 sending aggregated exchange:
Exchange[7F40FFF506AA26C-000000000000000D]
06:34:33.367 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated
exchange: Exchange[7F40FFF506AA26C-000000000000000D]
06:34:33.370 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for
correlation key batch-1 sending aggregated exchange:
Exchange[7F40FFF506AA26C-0000000000000009]
06:34:33.370 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated
exchange: Exchange[7F40FFF506AA26C-0000000000000009]
06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for
correlation key batch-1 sending aggregated exchange:
Exchange[7F40FFF506AA26C-0000000000000005]
06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated
exchange: Exchange[7F40FFF506AA26C-0000000000000005]
06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Aggregation complete for
correlation key batch-1 sending aggregated exchange:
Exchange[7F40FFF506AA26C-0000000000000001]
06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Processing aggregated
exchange: Exchange[7F40FFF506AA26C-0000000000000001]
06:34:33.392 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-4
from exchange 7F40FFF506AA26C-000000000000000D
06:34:33.411 INFO [Aggregator] zip-files-2.yaml:132 : Wrote batch-4 to
in/send/Processing-20231120063433118-batch-4.zip
06:34:33.413 INFO [Aggregator] zip-files-2.yaml:135 : Send batch-4
06:34:33.414 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange
onComplete: Exchange[7F40FFF506AA26C-000000000000000D]
06:34:33.415 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated
exchange: Exchange[7F40FFF506AA26C-000000000000000D] complete.
06:34:33.424 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-2
from exchange 7F40FFF506AA26C-0000000000000009
06:34:33.435 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange
onComplete: Exchange[7F40FFF506AA26C-0000000000000009]
06:34:33.435 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated
exchange: Exchange[7F40FFF506AA26C-0000000000000009] complete.
06:34:33.445 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-3
from exchange 7F40FFF506AA26C-0000000000000005
06:34:33.460 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange
onComplete: Exchange[7F40FFF506AA26C-0000000000000005]
06:34:33.460 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated
exchange: Exchange[7F40FFF506AA26C-0000000000000005] complete.
06:34:33.466 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch batch-4
from exchange 7F40FFF506AA26C-0000000000000001
06:34:33.483 INFO [Aggregator] zip-files-2.yaml:132 : Wrote batch-4 to
in/send/Processing-20231120063433118-batch-4.zip
06:34:33.485 INFO [Aggregator] zip-files-2.yaml:135 : Send batch-4
06:34:33.486 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange
onComplete: Exchange[7F40FFF506AA26C-0000000000000001]
06:34:33.486 TRACE [Aggregator] e.AggregateProcessor : Processing aggregated
exchange: Exchange[7F40FFF506AA26C-0000000000000001] complete.
06:34:34.962 INFO [2-thread-1] ainLifecycleStrategy : Duration max idle
triggering shutdown of the JVM
06:34:34.965 INFO [melContext] AbstractCamelContext : Apache Camel 4.2.0
(zip-files-2) is shutting down (timeout:5s0ms)
06:34:34.982 INFO [melContext] AbstractCamelContext : Routes stopped
(stopped:1)
06:34:34.982 INFO [melContext] AbstractCamelContext : Stopped zip-files
(file://in)
06:34:34.994 INFO [melContext] AbstractCamelContext : Apache Camel 4.2.0
(zip-files-2) shutdown in 28ms (uptime:3s)
06:34:34.995 INFO [ main] mel.main.MainSupport : Apache Camel (JBang)
4.2.0 shutdown