Finbarr Naughton created CAMEL-23523:
----------------------------------------
Summary: camel-kafka: batchingIntervalMs timer not reset on new
accumulation cycle, causing premature single-message batches after idle periods
Key: CAMEL-23523
URL: https://issues.apache.org/jira/browse/CAMEL-23523
Project: Camel
Issue Type: Bug
Components: camel-kafka
Affects Versions: 4.18.1
Environment: - OS: MacOS and Amazon Linux 2023, arm64
- Java 21 (Oracle JDK), Maven via `mvnw` wrapper (3.9.11)
- Docker available (required for Testcontainers-based Kafka)
Reporter: Finbarr Naughton
Attachments: KafkaBatchingIntervalResetAfterIdleIT.java
h3. Description
h4. What happens
After an idle period longer than `batchingIntervalMs`, `intervalWatch` is
already expired when the next message arrives. On the following empty poll,
`hasExpiredRecords()` fires immediately, flushing a batch containing only that
single message rather than waiting to accumulate more.
h4. Why it happens
`intervalWatch` is only reset inside `processBatch()`. It is never reset when
`exchangeList` transitions from empty to non-empty (i.e. when a new
accumulation cycle begins), unlike `timeoutWatch` which is reset at that point.
Affected code: `KafkaRecordBatchingProcessor.java`, `processExchange()` and
`hasExpiredRecords()`.
{code:java}
java
// hasExpiredRecords() - interval condition has no guard for active message
arrival
boolean interval = configuration.getBatchingIntervalMs() != null
&& intervalWatch.taken() >= configuration.getBatchingIntervalMs();
// processExchange() - timeoutWatch is reset when accumulation begins, but
intervalWatch is not
if (exchangeList.isEmpty()) {
timeoutWatch.takenAndRestart();
// intervalWatch is NOT reset here — this is the bug
}
{code}
h3. Steps to Reproduce
# Configure a Camel Kafka consumer with `batching=true`, a
`batchingIntervalMs` value (e.g. 5000), and a `maxPollRecords` value larger
than 1
2. Produce a burst of messages and allow them to be consumed
3. Wait for a period longer than `batchingIntervalMs` with no new messages
4. Produce a single new message
5. Observe that the message is dispatched as a single-message batch on the next
empty poll, rather than waiting to accumulate up to `maxPollRecords`
h3. Reproducer
An integration test modelled on the existing tests (sample attached - requires
running Docker for testcontainers) in :
`components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/`
The test should:
- Produce a burst of messages and confirm they are batched correctly
- Introduce an idle period longer than `batchingIntervalMs`
- Produce one message and assert that the consumer does not immediately flush a
single-message batch
--
This message was sent by Atlassian Jira
(v8.20.10#820010)