This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 744f2b4  CAMEL-17509: fix invalid topic info displayed when using 
topic patterns
744f2b4 is described below

commit 744f2b4d78edcde9e1fb02916e11dc7db7d4d821
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue Jan 18 12:13:45 2022 +0100

    CAMEL-17509: fix invalid topic info displayed when using topic patterns
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 57 ++++++++++++++--------
 .../support/PartitionAssignmentListener.java       | 11 +++--
 2 files changed, 43 insertions(+), 25 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index a2347fa..0d05e78 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -94,7 +94,10 @@ class KafkaFetchRecords implements Runnable {
             startPolling();
         } while ((isRetrying() || isReconnect()) && isKafkaConsumerRunnable());
 
-        LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: 
{}", threadId, topicName);
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Terminating KafkaConsumer thread: {} receiving from {}", 
threadId, getPrintableTopic());
+        }
+
         safeUnsubscribe();
         IOHelper.close(consumer);
     }
@@ -131,14 +134,16 @@ class KafkaFetchRecords implements Runnable {
 
     private void subscribe() {
         PartitionAssignmentListener listener = new PartitionAssignmentListener(
-                threadId, topicName,
-                kafkaConsumer.getEndpoint().getConfiguration(), consumer, 
lastProcessedOffset, this::isRunnable);
+                threadId, kafkaConsumer.getEndpoint().getConfiguration(), 
consumer, lastProcessedOffset,
+                this::isRunnable);
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Subscribing {} to {}", threadId, getPrintableTopic());
+        }
 
         if (topicPattern != null) {
-            LOG.info("Subscribing {} to topic pattern {}", threadId, 
topicName);
             consumer.subscribe(topicPattern, listener);
         } else {
-            LOG.info("Subscribing {} to topic {}", threadId, topicName);
             consumer.subscribe(Arrays.asList(topicName.split(",")), listener);
         }
     }
@@ -154,7 +159,10 @@ class KafkaFetchRecords implements Runnable {
             lock.lock();
 
             long pollTimeoutMs = 
kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
-            LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, 
topicName, pollTimeoutMs);
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Polling {} from {} with timeout: {}", threadId, 
getPrintableTopic(), pollTimeoutMs);
+            }
 
             KafkaRecordProcessorFacade recordProcessorFacade = new 
KafkaRecordProcessorFacade(
                     kafkaConsumer,
@@ -189,20 +197,24 @@ class KafkaFetchRecords implements Runnable {
                     e);
             commit();
 
-            LOG.info("Unsubscribing {} from topic {}", threadId, topicName);
+            LOG.info("Unsubscribing {} from {}", threadId, 
getPrintableTopic());
             safeUnsubscribe();
             Thread.currentThread().interrupt();
         } catch (WakeupException e) {
             // This is normal: it raises this exception when calling the 
wakeUp (which happens when we stop)
-            LOG.trace("The kafka consumer was woken up while polling on thread 
{} for topic {}", threadId, topicName);
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("The kafka consumer was woken up while polling on 
thread {} for {}", threadId, getPrintableTopic());
+            }
+
             safeUnsubscribe();
         } catch (Exception e) {
             if (LOG.isDebugEnabled()) {
-                LOG.warn("Exception {} caught while polling {} from kafka 
topic {} at offset {}: {}",
-                        e.getClass().getName(), threadId, topicName, 
lastProcessedOffset, e.getMessage(), e);
+                LOG.warn("Exception {} caught while polling {} from kafka {} 
at offset {}: {}",
+                        e.getClass().getName(), threadId, getPrintableTopic(), 
lastProcessedOffset, e.getMessage(), e);
             } else {
-                LOG.warn("Exception {} caught while polling {} from kafka 
topic {} at offset {}: {}",
-                        e.getClass().getName(), threadId, topicName, 
lastProcessedOffset, e.getMessage());
+                LOG.warn("Exception {} caught while polling {} from kafka {} 
at offset {}: {}",
+                        e.getClass().getName(), threadId, getPrintableTopic(), 
lastProcessedOffset, e.getMessage());
             }
 
             handleAccordingToStrategy(partitionLastOffset, e);
@@ -258,9 +270,9 @@ class KafkaFetchRecords implements Runnable {
      */
     private String getPrintableTopic() {
         if (topicPattern != null) {
-            return "topic pattern" + topicPattern;
+            return "topic pattern " + topicPattern;
         } else {
-            return "topic" + topicName;
+            return "topic " + topicName;
         }
     }
 
@@ -268,13 +280,13 @@ class KafkaFetchRecords implements Runnable {
         processAsyncCommits();
         if (isAutoCommitEnabled()) {
             if 
("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                LOG.info("Auto commitAsync on stop {} from topic {}", 
threadId, topicName);
+                LOG.info("Auto commitAsync on stop {} from {}", threadId, 
getPrintableTopic());
                 consumer.commitAsync();
             } else if 
("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                LOG.info("Auto commitSync on stop {} from topic {}", threadId, 
topicName);
+                LOG.info("Auto commitSync on stop {} from {}", threadId, 
getPrintableTopic());
                 consumer.commitSync();
             } else if 
("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                LOG.info("Auto commit on stop {} from topic {} is disabled 
(none)", threadId, topicName);
+                LOG.info("Auto commit on stop {} from {} is disabled (none)", 
threadId, getPrintableTopic());
             }
         }
     }
@@ -335,7 +347,12 @@ class KafkaFetchRecords implements Runnable {
         Set<TopicPartition> tps = consumer.assignment();
         if (tps != null && partitionLastOffset != -1) {
             long next = partitionLastOffset + 1;
-            LOG.info("Consumer seeking to next offset {} to continue polling 
next message from topic: {}", next, topicName);
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Consumer seeking to next offset {} to continue 
polling next message from {}", next,
+                        getPrintableTopic());
+            }
+
             for (TopicPartition tp : tps) {
                 consumer.seek(tp, next);
             }
@@ -343,8 +360,8 @@ class KafkaFetchRecords implements Runnable {
             for (TopicPartition tp : tps) {
                 long next = consumer.position(tp) + 1;
                 if (!logged) {
-                    LOG.info("Consumer seeking to next offset {} to continue 
polling next message from topic: {}", next,
-                            topicName);
+                    LOG.info("Consumer seeking to next offset {} to continue 
polling next message from {}", next,
+                            getPrintableTopic());
                     logged = true;
                 }
                 consumer.seek(tp, next);
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index 07d914a..51854b1 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -34,18 +34,16 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
     private static final Logger LOG = 
LoggerFactory.getLogger(PartitionAssignmentListener.class);
 
     private final String threadId;
-    private final String topicName;
     private final KafkaConfiguration configuration;
     private final Consumer consumer;
     private final Map<String, Long> lastProcessedOffset;
     private final KafkaConsumerResumeStrategy resumeStrategy;
     private Supplier<Boolean> stopStateSupplier;
 
-    public PartitionAssignmentListener(String threadId, String topicName, 
KafkaConfiguration configuration,
+    public PartitionAssignmentListener(String threadId, KafkaConfiguration 
configuration,
                                        Consumer consumer, Map<String, Long> 
lastProcessedOffset,
                                        Supplier<Boolean> stopStateSupplier) {
         this.threadId = threadId;
-        this.topicName = topicName;
         this.configuration = configuration;
         this.consumer = consumer;
         this.lastProcessedOffset = lastProcessedOffset;
@@ -56,12 +54,13 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-        LOG.debug("onPartitionsRevoked: {} from topic {}", threadId, 
topicName);
 
         // if camel is stopping, or we are not running
         boolean stopping = stopStateSupplier.get();
 
         for (TopicPartition partition : partitions) {
+            LOG.debug("onPartitionsRevoked: {} from {}", threadId, 
partition.topic());
+
             String offsetKey = serializeOffsetKey(partition);
             Long offset = lastProcessedOffset.get(offsetKey);
             if (offset == null) {
@@ -84,7 +83,9 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
 
     @Override
     public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-        LOG.debug("onPartitionsAssigned: {} from topic {}", threadId, 
topicName);
+        if (LOG.isDebugEnabled()) {
+            partitions.forEach(p -> LOG.debug("onPartitionsAssigned: {} from 
{}", threadId, p.topic()));
+        }
 
         resumeStrategy.resume(consumer);
     }

Reply via email to