This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-4.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.4.x by this push:
new 95c87e81f64 CAMEL-20790: prevent an NPE under high concurrency
95c87e81f64 is described below
commit 95c87e81f6472571b9267c249fe4aa909a8a1d6d
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue Jun 11 17:23:36 2024 +0200
CAMEL-20790: prevent an NPE under high concurrency
---
.../support/batching/KafkaRecordBatchingProcessor.java | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
index b6bf0a6e2e8..ed924dc78dd 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
@@ -16,8 +16,9 @@
*/
package org.apache.camel.component.kafka.consumer.support.batching;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -45,7 +46,7 @@ final class KafkaRecordBatchingProcessor extends
KafkaRecordProcessor {
private final Processor processor;
private final CommitManager commitManager;
private final StopWatch watch = new StopWatch();
- private List<Exchange> exchangeList;
+ private final Queue<Exchange> exchangeList;
private final class CommitSynchronization implements Synchronization {
private final ExceptionHandler exceptionHandler;
@@ -89,6 +90,8 @@ final class KafkaRecordBatchingProcessor extends
KafkaRecordProcessor {
this.configuration = configuration;
this.processor = processor;
this.commitManager = commitManager;
+
+ this.exchangeList = new
ArrayBlockingQueue<Exchange>(configuration.getMaxPollRecords());
}
public Exchange toExchange(
@@ -113,8 +116,7 @@ final class KafkaRecordBatchingProcessor extends
KafkaRecordProcessor {
LOG.debug("There's {} records to process ... max poll is set to {}",
consumerRecords.count(),
configuration.getMaxPollRecords());
// Aggregate all consumer records in a single exchange
- if (exchangeList == null) {
- exchangeList = new ArrayList<>(configuration.getMaxPollRecords());
+ if (exchangeList.isEmpty()) {
watch.takenAndRestart();
}
@@ -125,7 +127,7 @@ final class KafkaRecordBatchingProcessor extends
KafkaRecordProcessor {
// poll timeout has elapsed, so check for expired records
processBatch(camelKafkaConsumer);
- exchangeList = null;
+ exchangeList.clear();
return ProcessingResult.newUnprocessed();
}
@@ -138,7 +140,7 @@ final class KafkaRecordBatchingProcessor extends
KafkaRecordProcessor {
if (exchangeList.size() == configuration.getMaxPollRecords()) {
processBatch(camelKafkaConsumer);
- exchangeList = null;
+ exchangeList.clear();
}
}
@@ -156,7 +158,7 @@ final class KafkaRecordBatchingProcessor extends
KafkaRecordProcessor {
// Create the bundle exchange
final Exchange exchange = camelKafkaConsumer.createExchange(false);
final Message message = exchange.getMessage();
- message.setBody(exchangeList);
+ message.setBody(exchangeList.stream().toList());
try {
if (configuration.isAllowManualCommit()) {