[ 
https://issues.apache.org/jira/browse/CAMEL-23523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18081222#comment-18081222
 ] 

Claus Ibsen commented on CAMEL-23523:
-------------------------------------

Thanks for reporting and the PR

> camel-kafka: batchingIntervalMs timer not reset on new accumulation cycle, 
> causing premature single-message batches after idle periods
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-23523
>                 URL: https://issues.apache.org/jira/browse/CAMEL-23523
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 4.18.1
>         Environment: - OS: MacOS and Amazon Linux 2023, arm64
> - Java 21 (Oracle JDK), Maven via `mvnw` wrapper (3.9.11)
> - Docker available (required for Testcontainers-based Kafka)
>            Reporter: Finbarr Naughton
>            Priority: Minor
>             Fix For: 4.18.3, 4.21.0
>
>         Attachments: KafkaBatchingIntervalResetAfterIdleIT.java
>
>
> h3. Description
> h4. What happens
> After an idle period longer than `batchingIntervalMs`, `intervalWatch` is 
> already expired when the next message arrives. On the following empty poll, 
> `hasExpiredRecords()` fires immediately, flushing a batch containing only 
> that single message rather than waiting to accumulate more.
> h4. Why it happens
> `intervalWatch` is only reset inside `processBatch()`. It is never reset when 
> `exchangeList` transitions from empty to non-empty (i.e. when a new 
> accumulation cycle begins), unlike `timeoutWatch` which is reset at that 
> point.
> Affected code: `KafkaRecordBatchingProcessor.java`, `processExchange()` and 
> `hasExpiredRecords()`.
>  
> {code:java}
> java
> // hasExpiredRecords() - interval condition has no guard for active message 
> arrival
> boolean interval = configuration.getBatchingIntervalMs() != null
>         && intervalWatch.taken() >= configuration.getBatchingIntervalMs();
> // processExchange() - timeoutWatch is reset when accumulation begins, but 
> intervalWatch is not
> if (exchangeList.isEmpty()) {
>     timeoutWatch.takenAndRestart();
>     // intervalWatch is NOT reset here — this is the bug
> }
> {code}
>  
> h3. Steps to Reproduce
>  # Configure a Camel Kafka consumer with `batching=true`, a 
> `batchingIntervalMs` value (e.g. 5000), and a `maxPollRecords` value larger 
> than 1
> 2. Produce a burst of messages and allow them to be consumed
> 3. Wait for a period longer than `batchingIntervalMs` with no new messages
> 4. Produce a single new message
> 5. Observe that the message is dispatched as a single-message batch on the 
> next empty poll, rather than waiting to accumulate up to `maxPollRecords`
> h3. Reproducer
> An integration test modelled on the existing tests (sample attached - 
> requires running Docker for testcontainers) in :
> `components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/`
> The test should:
> - Produce a burst of messages and confirm they are batched correctly
> - Introduce an idle period longer than `batchingIntervalMs`
> - Produce one message and assert that the consumer does not immediately flush 
> a single-message batch
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to