[
https://issues.apache.org/jira/browse/CAMEL-23523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen updated CAMEL-23523:
--------------------------------
Fix Version/s: 4.14.8
> camel-kafka: batchingIntervalMs timer not reset on new accumulation cycle,
> causing premature single-message batches after idle periods
> --------------------------------------------------------------------------------------------------------------------------------------
>
> Key: CAMEL-23523
> URL: https://issues.apache.org/jira/browse/CAMEL-23523
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 4.18.1
> Environment: - OS: MacOS and Amazon Linux 2023, arm64
> - Java 21 (Oracle JDK), Maven via `mvnw` wrapper (3.9.11)
> - Docker available (required for Testcontainers-based Kafka)
> Reporter: Finbarr Naughton
> Priority: Minor
> Fix For: 4.14.8, 4.18.3, 4.21.0
>
> Attachments: KafkaBatchingIntervalResetAfterIdleIT.java
>
>
> h3. Description
> h4. What happens
> After an idle period longer than `batchingIntervalMs`, `intervalWatch` is
> already expired when the next message arrives. On the following empty poll,
> `hasExpiredRecords()` fires immediately, flushing a batch containing only
> that single message rather than waiting to accumulate more.
> h4. Why it happens
> `intervalWatch` is only reset inside `processBatch()`. It is never reset when
> `exchangeList` transitions from empty to non-empty (i.e. when a new
> accumulation cycle begins), unlike `timeoutWatch` which is reset at that
> point.
> Affected code: `KafkaRecordBatchingProcessor.java`, `processExchange()` and
> `hasExpiredRecords()`.
>
> {code:java}
> java
> // hasExpiredRecords() - interval condition has no guard for active message
> arrival
> boolean interval = configuration.getBatchingIntervalMs() != null
> && intervalWatch.taken() >= configuration.getBatchingIntervalMs();
> // processExchange() - timeoutWatch is reset when accumulation begins, but
> intervalWatch is not
> if (exchangeList.isEmpty()) {
> timeoutWatch.takenAndRestart();
> // intervalWatch is NOT reset here — this is the bug
> }
> {code}
>
> h3. Steps to Reproduce
> # Configure a Camel Kafka consumer with `batching=true`, a
> `batchingIntervalMs` value (e.g. 5000), and a `maxPollRecords` value larger
> than 1
> 2. Produce a burst of messages and allow them to be consumed
> 3. Wait for a period longer than `batchingIntervalMs` with no new messages
> 4. Produce a single new message
> 5. Observe that the message is dispatched as a single-message batch on the
> next empty poll, rather than waiting to accumulate up to `maxPollRecords`
> h3. Reproducer
> An integration test modelled on the existing tests (sample attached -
> requires running Docker for testcontainers) in :
> `components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/`
> The test should:
> - Produce a burst of messages and confirm they are batched correctly
> - Introduce an idle period longer than `batchingIntervalMs`
> - Produce one message and assert that the consumer does not immediately flush
> a single-message batch
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)