Finbarr Naughton created CAMEL-23523:
----------------------------------------

             Summary: camel-kafka: batchingIntervalMs timer not reset on new 
accumulation cycle, causing premature single-message batches after idle periods
                 Key: CAMEL-23523
                 URL: https://issues.apache.org/jira/browse/CAMEL-23523
             Project: Camel
          Issue Type: Bug
          Components: camel-kafka
    Affects Versions: 4.18.1
         Environment: - OS: MacOS and Amazon Linux 2023, arm64
- Java 21 (Oracle JDK), Maven via `mvnw` wrapper (3.9.11)
- Docker available (required for Testcontainers-based Kafka)
            Reporter: Finbarr Naughton
         Attachments: KafkaBatchingIntervalResetAfterIdleIT.java

h3. Description
h4. What happens

After an idle period longer than `batchingIntervalMs`, `intervalWatch` is 
already expired when the next message arrives. On the following empty poll, 
`hasExpiredRecords()` fires immediately, flushing a batch containing only that 
single message rather than waiting to accumulate more.
h4. Why it happens

`intervalWatch` is only reset inside `processBatch()`. It is never reset when 
`exchangeList` transitions from empty to non-empty (i.e. when a new 
accumulation cycle begins), unlike `timeoutWatch` which is reset at that point.

Affected code: `KafkaRecordBatchingProcessor.java`, `processExchange()` and 
`hasExpiredRecords()`.

 
{code:java}
java
// hasExpiredRecords() - interval condition has no guard for active message 
arrival
boolean interval = configuration.getBatchingIntervalMs() != null
        && intervalWatch.taken() >= configuration.getBatchingIntervalMs();
// processExchange() - timeoutWatch is reset when accumulation begins, but 
intervalWatch is not
if (exchangeList.isEmpty()) {
    timeoutWatch.takenAndRestart();
    // intervalWatch is NOT reset here — this is the bug
}


{code}
 
h3. Steps to Reproduce
 # Configure a Camel Kafka consumer with `batching=true`, a 
`batchingIntervalMs` value (e.g. 5000), and a `maxPollRecords` value larger 
than 1
2. Produce a burst of messages and allow them to be consumed
3. Wait for a period longer than `batchingIntervalMs` with no new messages
4. Produce a single new message
5. Observe that the message is dispatched as a single-message batch on the next 
empty poll, rather than waiting to accumulate up to `maxPollRecords`

h3. Reproducer

An integration test modelled on the existing tests (sample attached - requires 
running Docker for testcontainers) in :
`components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/`

The test should:
- Produce a burst of messages and confirm they are batched correctly
- Introduce an idle period longer than `batchingIntervalMs`
- Produce one message and assert that the consumer does not immediately flush a 
single-message batch

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to