This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c9cf6e2 MINOR: Subscribe/assign calls should be logged at info level (#6299) c9cf6e2 is described below commit c9cf6e2176bec3091c069031c70e46b0162f8095 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Thu Feb 21 13:03:34 2019 -0800 MINOR: Subscribe/assign calls should be logged at info level (#6299) Since we are logging offset resets and such at info level, it makes sense to use the same level for subscriptions and assignments. Reviewers: Ismael Juma <ism...@juma.me.uk> --- .../main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 6 +++--- .../kafka/clients/consumer/internals/ConsumerCoordinator.java | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index b9c811d..29fda34 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -933,7 +933,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { throwIfNoAssignorsConfigured(); fetcher.clearBufferedDataForUnassignedTopics(topics); - log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", ")); + log.info("Subscribed to topic(s): {}", Utils.join(topics, ", ")); this.subscriptions.subscribe(new HashSet<>(topics), listener); metadata.setTopics(subscriptions.groupSubscription()); } @@ -996,7 +996,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { acquireAndEnsureOpen(); try { throwIfNoAssignorsConfigured(); - log.debug("Subscribed to pattern: {}", pattern); + log.info("Subscribed to pattern: '{}'", pattern); this.subscriptions.subscribe(pattern, listener); this.metadata.needMetadataForAllTopics(true); this.coordinator.updatePatternSubscription(metadata.fetch()); @@ -1087,7 +1087,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { if (coordinator != null) this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); - log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", ")); + log.info("Subscribed to partition(s): {}", Utils.join(partitions, ", ")); this.subscriptions.assignFromUser(new HashSet<>(partitions)); metadata.setTopics(topics); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index f9c4257..4ee7519 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -50,6 +50,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import java.nio.ByteBuffer; @@ -285,7 +286,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // execute the user's callback after rebalance ConsumerRebalanceListener listener = subscriptions.rebalanceListener(); - log.info("Setting newly assigned partitions {}", assignedPartitions); + log.info("Setting newly assigned partitions: {}", Utils.join(assignedPartitions, ", ")); try { listener.onPartitionsAssigned(assignedPartitions); } catch (WakeupException | InterruptException e) {