This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 33807af8f58 CAMEL-18020: fix a possible NPE due to uninitialized consumer in poll exception strategies 33807af8f58 is described below commit 33807af8f5881a4b1619a336db81ccfbafb6858d Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Thu Apr 28 15:02:41 2022 +0200 CAMEL-18020: fix a possible NPE due to uninitialized consumer in poll exception strategies --- .../java/org/apache/camel/component/kafka/KafkaFetchRecords.java | 6 +++--- .../component/kafka/consumer/errorhandler/KafkaErrorStrategies.java | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index c2002826d7c..0f090a34d41 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -73,7 +73,7 @@ public class KafkaFetchRecords implements Runnable { private final Pattern topicPattern; private final String threadId; private final Properties kafkaProps; - private final PollExceptionStrategy pollExceptionStrategy; + private PollExceptionStrategy pollExceptionStrategy; private final BridgeExceptionHandlerToErrorHandler bridge; private final ReentrantLock lock = new ReentrantLock(); private CommitManager commitManager; @@ -98,8 +98,6 @@ public class KafkaFetchRecords implements Runnable { this.consumerListener = consumerListener; this.threadId = topicName + "-" + "Thread " + id; this.kafkaProps = kafkaProps; - - this.pollExceptionStrategy = KafkaErrorStrategies.strategies(this, kafkaConsumer.getEndpoint(), consumer); } @Override @@ -257,6 +255,8 @@ public class KafkaFetchRecords implements Runnable { } } } + + this.pollExceptionStrategy = KafkaErrorStrategies.strategies(this, kafkaConsumer.getEndpoint(), consumer); } finally { Thread.currentThread().setContextClassLoader(threadClassLoader); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java index 3541e08dbce..c02bf78d1eb 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java @@ -32,6 +32,8 @@ public final class KafkaErrorStrategies { public static PollExceptionStrategy strategies( KafkaFetchRecords recordFetcher, KafkaEndpoint endpoint, Consumer<?, ?> consumer) { + assert consumer != null; + PollExceptionStrategy strategy = endpoint.getComponent().getPollExceptionStrategy(); if (strategy != null) { return strategy;