This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 744f2b4 CAMEL-17509: fix invalid topic info displayed when using
topic patterns
744f2b4 is described below
commit 744f2b4d78edcde9e1fb02916e11dc7db7d4d821
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue Jan 18 12:13:45 2022 +0100
CAMEL-17509: fix invalid topic info displayed when using topic patterns
---
.../camel/component/kafka/KafkaFetchRecords.java | 57 ++++++++++++++--------
.../support/PartitionAssignmentListener.java | 11 +++--
2 files changed, 43 insertions(+), 25 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index a2347fa..0d05e78 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -94,7 +94,10 @@ class KafkaFetchRecords implements Runnable {
startPolling();
} while ((isRetrying() || isReconnect()) && isKafkaConsumerRunnable());
- LOG.info("Terminating KafkaConsumer thread: {} receiving from topic:
{}", threadId, topicName);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Terminating KafkaConsumer thread: {} receiving from {}",
threadId, getPrintableTopic());
+ }
+
safeUnsubscribe();
IOHelper.close(consumer);
}
@@ -131,14 +134,16 @@ class KafkaFetchRecords implements Runnable {
private void subscribe() {
PartitionAssignmentListener listener = new PartitionAssignmentListener(
- threadId, topicName,
- kafkaConsumer.getEndpoint().getConfiguration(), consumer,
lastProcessedOffset, this::isRunnable);
+ threadId, kafkaConsumer.getEndpoint().getConfiguration(),
consumer, lastProcessedOffset,
+ this::isRunnable);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Subscribing {} to {}", threadId, getPrintableTopic());
+ }
if (topicPattern != null) {
- LOG.info("Subscribing {} to topic pattern {}", threadId,
topicName);
consumer.subscribe(topicPattern, listener);
} else {
- LOG.info("Subscribing {} to topic {}", threadId, topicName);
consumer.subscribe(Arrays.asList(topicName.split(",")), listener);
}
}
@@ -154,7 +159,10 @@ class KafkaFetchRecords implements Runnable {
lock.lock();
long pollTimeoutMs =
kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
- LOG.trace("Polling {} from topic: {} with timeout: {}", threadId,
topicName, pollTimeoutMs);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Polling {} from {} with timeout: {}", threadId,
getPrintableTopic(), pollTimeoutMs);
+ }
KafkaRecordProcessorFacade recordProcessorFacade = new
KafkaRecordProcessorFacade(
kafkaConsumer,
@@ -189,20 +197,24 @@ class KafkaFetchRecords implements Runnable {
e);
commit();
- LOG.info("Unsubscribing {} from topic {}", threadId, topicName);
+ LOG.info("Unsubscribing {} from {}", threadId,
getPrintableTopic());
safeUnsubscribe();
Thread.currentThread().interrupt();
} catch (WakeupException e) {
// This is normal: it raises this exception when calling the
wakeUp (which happens when we stop)
- LOG.trace("The kafka consumer was woken up while polling on thread
{} for topic {}", threadId, topicName);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("The kafka consumer was woken up while polling on
thread {} for {}", threadId, getPrintableTopic());
+ }
+
safeUnsubscribe();
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
- LOG.warn("Exception {} caught while polling {} from kafka
topic {} at offset {}: {}",
- e.getClass().getName(), threadId, topicName,
lastProcessedOffset, e.getMessage(), e);
+ LOG.warn("Exception {} caught while polling {} from kafka {}
at offset {}: {}",
+ e.getClass().getName(), threadId, getPrintableTopic(),
lastProcessedOffset, e.getMessage(), e);
} else {
- LOG.warn("Exception {} caught while polling {} from kafka
topic {} at offset {}: {}",
- e.getClass().getName(), threadId, topicName,
lastProcessedOffset, e.getMessage());
+ LOG.warn("Exception {} caught while polling {} from kafka {}
at offset {}: {}",
+ e.getClass().getName(), threadId, getPrintableTopic(),
lastProcessedOffset, e.getMessage());
}
handleAccordingToStrategy(partitionLastOffset, e);
@@ -258,9 +270,9 @@ class KafkaFetchRecords implements Runnable {
*/
private String getPrintableTopic() {
if (topicPattern != null) {
- return "topic pattern" + topicPattern;
+ return "topic pattern " + topicPattern;
} else {
- return "topic" + topicName;
+ return "topic " + topicName;
}
}
@@ -268,13 +280,13 @@ class KafkaFetchRecords implements Runnable {
processAsyncCommits();
if (isAutoCommitEnabled()) {
if
("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
{
- LOG.info("Auto commitAsync on stop {} from topic {}",
threadId, topicName);
+ LOG.info("Auto commitAsync on stop {} from {}", threadId,
getPrintableTopic());
consumer.commitAsync();
} else if
("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
{
- LOG.info("Auto commitSync on stop {} from topic {}", threadId,
topicName);
+ LOG.info("Auto commitSync on stop {} from {}", threadId,
getPrintableTopic());
consumer.commitSync();
} else if
("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
{
- LOG.info("Auto commit on stop {} from topic {} is disabled
(none)", threadId, topicName);
+ LOG.info("Auto commit on stop {} from {} is disabled (none)",
threadId, getPrintableTopic());
}
}
}
@@ -335,7 +347,12 @@ class KafkaFetchRecords implements Runnable {
Set<TopicPartition> tps = consumer.assignment();
if (tps != null && partitionLastOffset != -1) {
long next = partitionLastOffset + 1;
- LOG.info("Consumer seeking to next offset {} to continue polling
next message from topic: {}", next, topicName);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Consumer seeking to next offset {} to continue
polling next message from {}", next,
+ getPrintableTopic());
+ }
+
for (TopicPartition tp : tps) {
consumer.seek(tp, next);
}
@@ -343,8 +360,8 @@ class KafkaFetchRecords implements Runnable {
for (TopicPartition tp : tps) {
long next = consumer.position(tp) + 1;
if (!logged) {
- LOG.info("Consumer seeking to next offset {} to continue
polling next message from topic: {}", next,
- topicName);
+ LOG.info("Consumer seeking to next offset {} to continue
polling next message from {}", next,
+ getPrintableTopic());
logged = true;
}
consumer.seek(tp, next);
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index 07d914a..51854b1 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -34,18 +34,16 @@ public class PartitionAssignmentListener implements
ConsumerRebalanceListener {
private static final Logger LOG =
LoggerFactory.getLogger(PartitionAssignmentListener.class);
private final String threadId;
- private final String topicName;
private final KafkaConfiguration configuration;
private final Consumer consumer;
private final Map<String, Long> lastProcessedOffset;
private final KafkaConsumerResumeStrategy resumeStrategy;
private Supplier<Boolean> stopStateSupplier;
- public PartitionAssignmentListener(String threadId, String topicName,
KafkaConfiguration configuration,
+ public PartitionAssignmentListener(String threadId, KafkaConfiguration
configuration,
Consumer consumer, Map<String, Long>
lastProcessedOffset,
Supplier<Boolean> stopStateSupplier) {
this.threadId = threadId;
- this.topicName = topicName;
this.configuration = configuration;
this.consumer = consumer;
this.lastProcessedOffset = lastProcessedOffset;
@@ -56,12 +54,13 @@ public class PartitionAssignmentListener implements
ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
- LOG.debug("onPartitionsRevoked: {} from topic {}", threadId,
topicName);
// if camel is stopping, or we are not running
boolean stopping = stopStateSupplier.get();
for (TopicPartition partition : partitions) {
+ LOG.debug("onPartitionsRevoked: {} from {}", threadId,
partition.topic());
+
String offsetKey = serializeOffsetKey(partition);
Long offset = lastProcessedOffset.get(offsetKey);
if (offset == null) {
@@ -84,7 +83,9 @@ public class PartitionAssignmentListener implements
ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- LOG.debug("onPartitionsAssigned: {} from topic {}", threadId,
topicName);
+ if (LOG.isDebugEnabled()) {
+ partitions.forEach(p -> LOG.debug("onPartitionsAssigned: {} from
{}", threadId, p.topic()));
+ }
resumeStrategy.resume(consumer);
}