MINOR: Ensure consumer logging has clientId/groupId context This patch ensures that the consumer groupId and clientId are available in all log messages which makes debugging much easier when a single application has multiple consumer instances. To make this easier, I've added a new `LogContext` object which builds a log prefix similar to the broker-side `kafka.utils.Logging` mixin. Additionally this patch changes the log level for a couple minor cases:
- Consumer wakeup events are now logged at DEBUG instead of TRACE - Heartbeat enabling/disabling is now logged at DEBUG instead of TRACE Author: Jason Gustafson <ja...@confluent.io> Reviewers: Manikumar Reddy <manikumar.re...@gmail.com>, Ismael Juma <ism...@juma.me.uk> Closes #3676 from hachikuji/log-consumer-wakeups Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6896f1dd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6896f1dd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6896f1dd Branch: refs/heads/trunk Commit: 6896f1ddb7650f42630aef8c67c8b61866e9fc00 Parents: ed96523 Author: Jason Gustafson <ja...@confluent.io> Authored: Sat Aug 19 11:17:02 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Sat Aug 19 11:17:02 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 81 ++-- .../consumer/internals/AbstractCoordinator.java | 84 ++-- .../consumer/internals/ConsumerCoordinator.java | 148 ++++--- .../internals/ConsumerNetworkClient.java | 35 +- .../clients/consumer/internals/Fetcher.java | 10 +- .../apache/kafka/common/utils/LogContext.java | 381 +++++++++++++++++++ .../clients/consumer/KafkaConsumerTest.java | 11 +- .../internals/AbstractCoordinatorTest.java | 7 +- .../internals/ConsumerCoordinatorTest.java | 4 +- .../internals/ConsumerNetworkClientTest.java | 15 +- .../clients/consumer/internals/FetcherTest.java | 8 +- .../runtime/distributed/WorkerCoordinator.java | 12 +- .../runtime/distributed/WorkerGroupMember.java | 24 +- .../distributed/WorkerCoordinatorTest.java | 9 +- .../main/scala/kafka/admin/AdminClient.scala | 3 +- 15 files changed, 628 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index f1351b7..073b2df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -49,10 +49,10 @@ import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.Collection; @@ -541,7 +541,6 @@ import java.util.regex.Pattern; */ public class KafkaConsumer<K, V> implements Consumer<K, V> { - private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); private static final long NO_CURRENT_THREAD = -1L; private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); private static final String JMX_PREFIX = "kafka.consumer"; @@ -550,6 +549,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { // Visible for testing final Metrics metrics; + private final Logger log; private final String clientId; private final ConsumerCoordinator coordinator; private final Deserializer<K> keyDeserializer; @@ -640,7 +640,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { try { - log.debug("Starting the Kafka consumer"); + String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); + if (clientId.isEmpty()) + clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); + this.clientId = clientId; + String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG); + + LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] "); + this.log = logContext.logger(getClass()); + + log.debug("Initializing the Kafka consumer"); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); @@ -648,10 +657,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); this.time = Time.SYSTEM; - String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); - if (clientId.length() <= 0) - clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); - this.clientId = clientId; Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) @@ -712,31 +717,39 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { true, new ApiVersions(), throttleTimeSensor); - this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs, + this.client = new ConsumerNetworkClient( + logContext, + netClient, + metadata, + time, + retryBackoffMs, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT)); this.subscriptions = new SubscriptionState(offsetResetStrategy); List<PartitionAssignor> assignors = config.getConfiguredInstances( ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, PartitionAssignor.class); - this.coordinator = new ConsumerCoordinator(this.client, - config.getString(ConsumerConfig.GROUP_ID_CONFIG), - config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), - config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), - config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), - assignors, - this.metadata, - this.subscriptions, - metrics, - metricGrpPrefix, - this.time, - retryBackoffMs, - config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), - config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), - this.interceptors, - config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG), - config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG)); - this.fetcher = new Fetcher<>(this.client, + this.coordinator = new ConsumerCoordinator(logContext, + this.client, + groupId, + config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), + config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), + config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), + assignors, + this.metadata, + this.subscriptions, + metrics, + metricGrpPrefix, + this.time, + retryBackoffMs, + config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), + config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), + this.interceptors, + config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG), + config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG)); + this.fetcher = new Fetcher<>( + logContext, + this.client, config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), @@ -756,7 +769,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); - log.debug("Kafka consumer with client id {} created", clientId); + log.debug("Kafka consumer initialized"); } catch (Throwable t) { // call close methods if internal objects are already constructed // this is to prevent resource leak. see KAFKA-2121 @@ -767,7 +780,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } // visible for testing - KafkaConsumer(String clientId, + KafkaConsumer(LogContext logContext, + String clientId, ConsumerCoordinator coordinator, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, @@ -780,6 +794,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { Metadata metadata, long retryBackoffMs, long requestTimeoutMs) { + this.log = logContext.logger(getClass()); this.clientId = clientId; this.coordinator = coordinator; this.keyDeserializer = keyDeserializer; @@ -1242,7 +1257,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { acquireAndEnsureOpen(); try { - log.debug("Committing offsets: {} ", offsets); + log.debug("Committing offsets: {}", offsets); coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback); } finally { release(); @@ -1440,8 +1455,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { public void pause(Collection<TopicPartition> partitions) { acquireAndEnsureOpen(); try { + log.debug("Pausing partitions {}", partitions); for (TopicPartition partition: partitions) { - log.debug("Pausing partition {}", partition); subscriptions.pause(partition); } } finally { @@ -1459,8 +1474,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { public void resume(Collection<TopicPartition> partitions) { acquireAndEnsureOpen(); try { + log.debug("Resuming partitions {}", partitions); for (TopicPartition partition: partitions) { - log.debug("Resuming partition {}", partition); subscriptions.resume(partition); } } finally { @@ -1630,7 +1645,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } private void close(long timeoutMs, boolean swallowException) { - log.trace("Closing the Kafka consumer."); + log.trace("Closing the Kafka consumer"); AtomicReference<Throwable> firstException = new AtomicReference<>(); try { if (coordinator != null) @@ -1646,7 +1661,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException); ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException); AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId); - log.debug("Kafka consumer with client id {} has been closed", clientId); + log.debug("Kafka consumer has been closed"); Throwable exception = firstException.get(); if (exception != null && !swallowException) { if (exception instanceof InterruptException) { http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 74ef20a..dcf837b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -48,9 +48,9 @@ import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.Closeable; import java.nio.ByteBuffer; @@ -90,8 +90,6 @@ import java.util.concurrent.atomic.AtomicReference; * when sending a request that affects the state of the group (e.g. JoinGroup, LeaveGroup). */ public abstract class AbstractCoordinator implements Closeable { - - private static final Logger log = LoggerFactory.getLogger(AbstractCoordinator.class); public static final String HEARTBEAT_THREAD_PREFIX = "kafka-coordinator-heartbeat-thread"; private enum MemberState { @@ -100,11 +98,12 @@ public abstract class AbstractCoordinator implements Closeable { STABLE, // the client has joined and is sending heartbeats } - protected final int rebalanceTimeoutMs; + private final Logger log; private final int sessionTimeoutMs; private final boolean leaveGroupOnClose; private final GroupCoordinatorMetrics sensors; private final Heartbeat heartbeat; + protected final int rebalanceTimeoutMs; protected final String groupId; protected final ConsumerNetworkClient client; protected final Time time; @@ -123,7 +122,8 @@ public abstract class AbstractCoordinator implements Closeable { /** * Initialize the coordination manager. */ - public AbstractCoordinator(ConsumerNetworkClient client, + public AbstractCoordinator(LogContext logContext, + ConsumerNetworkClient client, String groupId, int rebalanceTimeoutMs, int sessionTimeoutMs, @@ -133,6 +133,7 @@ public abstract class AbstractCoordinator implements Closeable { Time time, long retryBackoffMs, boolean leaveGroupOnClose) { + this.log = logContext.logger(AbstractCoordinator.class); this.client = client; this.time = time; this.groupId = groupId; @@ -222,7 +223,7 @@ public abstract class AbstractCoordinator implements Closeable { if (remainingMs <= 0) break; - log.debug("Coordinator discovery failed for group {}, refreshing metadata", groupId); + log.debug("Coordinator discovery failed, refreshing metadata"); client.awaitMetadataUpdate(remainingMs); } else throw future.exception(); @@ -246,7 +247,7 @@ public abstract class AbstractCoordinator implements Closeable { // find a node to ask about the coordinator Node node = this.client.leastLoadedNode(); if (node == null) { - log.debug("No broker available to send GroupCoordinator request for group {}", groupId); + log.debug("No broker available to send GroupCoordinator request"); return RequestFuture.noBrokersAvailable(); } else findCoordinatorFuture = sendGroupCoordinatorRequest(node); @@ -404,7 +405,7 @@ public abstract class AbstractCoordinator implements Closeable { // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance synchronized (AbstractCoordinator.this) { - log.info("Successfully joined group {} with generation {}", groupId, generation.generationId); + log.info("Successfully joined group with generation {}", generation.generationId); state = MemberState.STABLE; rejoinNeeded = false; @@ -437,7 +438,7 @@ public abstract class AbstractCoordinator implements Closeable { return RequestFuture.coordinatorNotAvailable(); // send a join group request to the coordinator - log.info("(Re-)joining group {}", groupId); + log.info("(Re-)joining group"); JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder( groupId, this.sessionTimeoutMs, @@ -455,7 +456,7 @@ public abstract class AbstractCoordinator implements Closeable { public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) { Errors error = joinResponse.error(); if (error == Errors.NONE) { - log.debug("Received successful JoinGroup response for group {}: {}", groupId, joinResponse); + log.debug("Received successful JoinGroup response: {}", joinResponse); sensors.joinLatency.record(response.requestLatencyMs()); synchronized (AbstractCoordinator.this) { @@ -474,26 +475,25 @@ public abstract class AbstractCoordinator implements Closeable { } } } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { - log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId, - coordinator()); + log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator()); // backoff and retry future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID) { // reset the member id and retry immediately resetGeneration(); - log.debug("Attempt to join group {} failed due to unknown member id.", groupId); + log.debug("Attempt to join group failed due to unknown member id."); future.raise(Errors.UNKNOWN_MEMBER_ID); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { // re-discover the coordinator and retry with backoff coordinatorDead(); - log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", groupId, error.message()); + log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message()); future.raise(error); } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL || error == Errors.INVALID_SESSION_TIMEOUT || error == Errors.INVALID_GROUP_ID) { // log the error and re-throw the exception - log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message()); + log.error("Attempt to join group failed due to fatal error: {}", error.message()); future.raise(error); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); @@ -509,8 +509,7 @@ public abstract class AbstractCoordinator implements Closeable { SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, Collections.<String, ByteBuffer>emptyMap()); - log.debug("Sending follower SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, - requestBuilder); + log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder); return sendSyncGroupRequest(requestBuilder); } @@ -522,8 +521,7 @@ public abstract class AbstractCoordinator implements Closeable { SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment); - log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", - groupId, this.coordinator, requestBuilder); + log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder); return sendSyncGroupRequest(requestBuilder); } catch (RuntimeException e) { return RequestFuture.failure(e); @@ -551,16 +549,16 @@ public abstract class AbstractCoordinator implements Closeable { if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else if (error == Errors.REBALANCE_IN_PROGRESS) { - log.debug("SyncGroup for group {} failed due to coordinator rebalance", groupId); + log.debug("SyncGroup failed due to group rebalance"); future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) { - log.debug("SyncGroup for group {} failed due to {}", groupId, error); + log.debug("SyncGroup failed: {}", error.message()); resetGeneration(); future.raise(error); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { - log.debug("SyncGroup for group {} failed due to {}", groupId, error); + log.debug("SyncGroup failed:", error.message()); coordinatorDead(); future.raise(error); } else { @@ -577,7 +575,7 @@ public abstract class AbstractCoordinator implements Closeable { */ private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) { // initiate the group metadata request - log.debug("Sending GroupCoordinator request for group {} to broker {}", groupId, node); + log.debug("Sending GroupCoordinator request to broker {}", node); FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId); return client.send(node, requestBuilder) @@ -588,7 +586,7 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void onSuccess(ClientResponse resp, RequestFuture<Void> future) { - log.debug("Received GroupCoordinator response {} for group {}", resp, groupId); + log.debug("Received GroupCoordinator response {}", resp); FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody(); // use MAX_VALUE - node.id as the coordinator id to mimic separate connections @@ -602,7 +600,7 @@ public abstract class AbstractCoordinator implements Closeable { Integer.MAX_VALUE - findCoordinatorResponse.node().id(), findCoordinatorResponse.node().host(), findCoordinatorResponse.node().port()); - log.info("Discovered coordinator {} for group {}.", coordinator, groupId); + log.info("Discovered coordinator {}", coordinator); client.tryConnect(coordinator); heartbeat.resetTimeouts(time.milliseconds()); } @@ -610,7 +608,7 @@ public abstract class AbstractCoordinator implements Closeable { } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { - log.debug("Group coordinator lookup for group {} failed: {}", groupId, error.message()); + log.debug("Group coordinator lookup failed: {}", error.message()); future.raise(error); } } @@ -647,7 +645,7 @@ public abstract class AbstractCoordinator implements Closeable { */ protected synchronized void coordinatorDead() { if (this.coordinator != null) { - log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId); + log.info("Marking the coordinator {} dead", this.coordinator); // Disconnect from the coordinator to ensure that there are no in-flight requests remaining. // Pending callbacks will be invoked with a DisconnectException. @@ -705,8 +703,8 @@ public abstract class AbstractCoordinator implements Closeable { // If coordinator is not known, requests are aborted. Node coordinator = coordinator(); if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs)) - log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.", - client.pendingRequestCount(coordinator), groupId); + log.warn("Close timed out with {} pending requests to coordinator, terminating client connections", + client.pendingRequestCount(coordinator)); } } } @@ -718,7 +716,7 @@ public abstract class AbstractCoordinator implements Closeable { if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation != Generation.NO_GENERATION) { // this is a minimal effort attempt to leave the group. we do not // attempt any resending if the request fails or times out. - log.debug("Sending LeaveGroup request to coordinator {} for group {}", coordinator, groupId); + log.debug("Sending LeaveGroup request to coordinator {}", coordinator); LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(groupId, generation.memberId); client.send(coordinator, request) @@ -734,10 +732,10 @@ public abstract class AbstractCoordinator implements Closeable { public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) { Errors error = leaveResponse.error(); if (error == Errors.NONE) { - log.debug("LeaveGroup request for group {} returned successfully", groupId); + log.debug("LeaveGroup request returned successfully"); future.complete(null); } else { - log.debug("LeaveGroup request for group {} failed with error: {}", groupId, error.message()); + log.debug("LeaveGroup request failed with error: {}", error.message()); future.raise(error); } } @@ -745,7 +743,7 @@ public abstract class AbstractCoordinator implements Closeable { // visible for testing synchronized RequestFuture<Void> sendHeartbeatRequest() { - log.debug("Sending Heartbeat request for group {} to coordinator {}", groupId, coordinator); + log.debug("Sending Heartbeat request to coordinator {}", coordinator); HeartbeatRequest.Builder requestBuilder = new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId); return client.send(coordinator, requestBuilder) @@ -758,24 +756,24 @@ public abstract class AbstractCoordinator implements Closeable { sensors.heartbeatLatency.record(response.requestLatencyMs()); Errors error = heartbeatResponse.error(); if (error == Errors.NONE) { - log.debug("Received successful Heartbeat response for group {}", groupId); + log.debug("Received successful Heartbeat response"); future.complete(null); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { - log.debug("Attempt to heartbeat failed for group {} since coordinator {} is either not started or not valid.", - groupId, coordinator()); + log.debug("Attempt to heartbeat since coordinator {} is either not started or not valid.", + coordinator()); coordinatorDead(); future.raise(error); } else if (error == Errors.REBALANCE_IN_PROGRESS) { - log.debug("Attempt to heartbeat failed for group {} since it is rebalancing.", groupId); + log.debug("Attempt to heartbeat failed since group is rebalancing"); requestRejoin(); future.raise(Errors.REBALANCE_IN_PROGRESS); } else if (error == Errors.ILLEGAL_GENERATION) { - log.debug("Attempt to heartbeat failed for group {} since generation id is not legal.", groupId); + log.debug("Attempt to heartbeat failed since generation {} is not current", generation.generationId); resetGeneration(); future.raise(Errors.ILLEGAL_GENERATION); } else if (error == Errors.UNKNOWN_MEMBER_ID) { - log.debug("Attempt to heartbeat failed for group {} since member id is not valid.", groupId); + log.debug("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId); resetGeneration(); future.raise(Errors.UNKNOWN_MEMBER_ID); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { @@ -879,7 +877,7 @@ public abstract class AbstractCoordinator implements Closeable { public void enable() { synchronized (AbstractCoordinator.this) { - log.trace("Enabling heartbeat thread for group {}", groupId); + log.debug("Enabling heartbeat thread"); this.enabled = true; heartbeat.resetTimeouts(time.milliseconds()); AbstractCoordinator.this.notify(); @@ -888,7 +886,7 @@ public abstract class AbstractCoordinator implements Closeable { public void disable() { synchronized (AbstractCoordinator.this) { - log.trace("Disabling heartbeat thread for group {}", groupId); + log.debug("Disabling heartbeat thread"); this.enabled = false; } } @@ -911,7 +909,7 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void run() { try { - log.debug("Heartbeat thread for group {} started", groupId); + log.debug("Heartbeat thread started"); while (true) { synchronized (AbstractCoordinator.this) { if (closed) @@ -989,7 +987,7 @@ public abstract class AbstractCoordinator implements Closeable { log.error("Heartbeat thread for group {} failed due to unexpected error", groupId, e); this.failed.set(e); } finally { - log.debug("Heartbeat thread for group {} has closed", groupId); + log.debug("Heartbeat thread has closed"); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 5ba9ccb..3a80d06 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -47,9 +47,9 @@ import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.OffsetFetchRequest; import org.apache.kafka.common.requests.OffsetFetchResponse; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -66,9 +66,7 @@ import java.util.concurrent.atomic.AtomicInteger; * This class manages the coordination process with the consumer coordinator. */ public final class ConsumerCoordinator extends AbstractCoordinator { - - private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class); - + private final Logger log; private final List<PartitionAssignor> assignors; private final Metadata metadata; private final ConsumerCoordinatorMetrics sensors; @@ -93,7 +91,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { /** * Initialize the coordination manager. */ - public ConsumerCoordinator(ConsumerNetworkClient client, + public ConsumerCoordinator(LogContext logContext, + ConsumerNetworkClient client, String groupId, int rebalanceTimeoutMs, int sessionTimeoutMs, @@ -110,7 +109,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { ConsumerInterceptors<?, ?> interceptors, boolean excludeInternalTopics, final boolean leaveGroupOnClose) { - super(client, + super(logContext, + client, groupId, rebalanceTimeoutMs, sessionTimeoutMs, @@ -120,6 +120,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { time, retryBackoffMs, leaveGroupOnClose); + this.log = logContext.logger(ConsumerCoordinator.class); this.metadata = metadata; this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch()); this.subscriptions = subscriptions; @@ -259,15 +260,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // execute the user's callback after rebalance ConsumerRebalanceListener listener = subscriptions.listener(); - log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId); + log.info("Setting newly assigned partitions {}", subscriptions.assignedPartitions()); try { Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions()); listener.onPartitionsAssigned(assigned); } catch (WakeupException | InterruptException e) { throw e; } catch (Exception e) { - log.error("User provided listener {} for group {} failed on partition assignment", - listener.getClass().getName(), groupId, e); + log.error("User provided listener {} failed on partition assignment", listener.getClass().getName(), e); } } @@ -359,8 +359,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { isLeader = true; - log.debug("Performing assignment for group {} using strategy {} with subscriptions {}", - groupId, assignor.name(), subscriptions); + log.debug("Performing assignment using strategy {} with subscriptions {}", assignor.name(), subscriptions); Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions); @@ -380,15 +379,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (!assignedTopics.containsAll(allSubscribedTopics)) { Set<String> notAssignedTopics = new HashSet<>(allSubscribedTopics); notAssignedTopics.removeAll(assignedTopics); - log.warn("The following subscribed topics are not assigned to any members in the group {} : {} ", groupId, - notAssignedTopics); + log.warn("The following subscribed topics are not assigned to any members: {} ", notAssignedTopics); } if (!allSubscribedTopics.containsAll(assignedTopics)) { Set<String> newlyAddedTopics = new HashSet<>(assignedTopics); newlyAddedTopics.removeAll(allSubscribedTopics); - log.info("The following not-subscribed topics are assigned to group {}, and their metadata will be " + - "fetched from the brokers : {}", groupId, newlyAddedTopics); + log.info("The following not-subscribed topics are assigned, and their metadata will be " + + "fetched from the brokers: {}", newlyAddedTopics); allSubscribedTopics.addAll(assignedTopics); this.subscriptions.groupSubscribe(allSubscribedTopics); @@ -398,7 +396,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { assignmentSnapshot = metadataSnapshot; - log.debug("Finished assignment for group {}: {}", groupId, assignment); + log.debug("Finished assignment for group: {}", assignment); Map<String, ByteBuffer> groupAssignment = new HashMap<>(); for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) { @@ -416,15 +414,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // execute the user's callback before rebalance ConsumerRebalanceListener listener = subscriptions.listener(); - log.info("Revoking previously assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId); + log.info("Revoking previously assigned partitions {}", subscriptions.assignedPartitions()); try { Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions()); listener.onPartitionsRevoked(revoked); } catch (WakeupException | InterruptException e) { throw e; } catch (Exception e) { - log.error("User provided listener {} for group {} failed on partition revocation", - listener.getClass().getName(), groupId, e); + log.error("User provided listener {} failed on partition revocation", listener.getClass().getName(), e); } isLeader = false; @@ -645,18 +642,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private void doAutoCommitOffsetsAsync() { Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed(); - log.debug("Sending asynchronous auto-commit of offsets {} for group {}", allConsumedOffsets, groupId); + log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets); commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { - log.warn("Auto-commit of offsets {} failed for group {}: {}", offsets, groupId, - exception.getMessage()); + log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage()); if (exception instanceof RetriableException) nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline); } else { - log.debug("Completed auto-commit of offsets {} for group {}", offsets, groupId); + log.debug("Completed asynchronous auto-commit of offsets {}", offsets); } } }); @@ -666,19 +662,16 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (autoCommitEnabled) { Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed(); try { - log.debug("Sending synchronous auto-commit of offsets {} for group {}", allConsumedOffsets, groupId); + log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets); if (!commitOffsetsSync(allConsumedOffsets, timeoutMs)) - log.debug("Auto-commit of offsets {} for group {} timed out before completion", - allConsumedOffsets, groupId); + log.debug("Auto-commit of offsets {} timed out before completion", allConsumedOffsets); } catch (WakeupException | InterruptException e) { - log.debug("Auto-commit of offsets {} for group {} was interrupted before completion", - allConsumedOffsets, groupId); + log.debug("Auto-commit of offsets {} was interrupted before completion", allConsumedOffsets); // rethrow wakeups since they are triggered by the user throw e; } catch (Exception e) { // consistent with async auto-commit failures, we do not propagate the exception - log.warn("Auto-commit of offsets {} failed for group {}: {}", allConsumedOffsets, groupId, - e.getMessage()); + log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, e.getMessage()); } } } @@ -687,7 +680,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) - log.error("Offset commit with offsets {} failed for group {}", offsets, groupId, exception); + log.error("Offset commit with offsets {} failed", offsets, exception); } } @@ -734,7 +727,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { setMemberId(generation.memberId). setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME); - log.trace("Sending OffsetCommit request with {} to coordinator {} for group {}", offsets, coordinator, groupId); + log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator); return client.send(coordinator, builder) .compose(new OffsetCommitResponseHandler(offsets)); @@ -760,55 +753,52 @@ public final class ConsumerCoordinator extends AbstractCoordinator { Errors error = entry.getValue(); if (error == Errors.NONE) { - log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp); + log.debug("Committed offset {} for partition {}", offset, tp); if (subscriptions.isAssigned(tp)) // update the local cache only if the partition is still assigned subscriptions.committed(tp, offsetAndMetadata); - } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - log.error("Not authorized to commit offsets for group {}", groupId); - future.raise(new GroupAuthorizationException(groupId)); - return; - } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { - unauthorizedTopics.add(tp.topic()); - } else if (error == Errors.OFFSET_METADATA_TOO_LARGE - || error == Errors.INVALID_COMMIT_OFFSET_SIZE) { - // raise the error to the user - log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message()); - future.raise(error); - return; - } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { - // just retry - log.debug("Offset commit for group {} failed: {}", groupId, error.message()); - future.raise(error); - return; - } else if (error == Errors.COORDINATOR_NOT_AVAILABLE - || error == Errors.NOT_COORDINATOR - || error == Errors.REQUEST_TIMED_OUT) { - log.debug("Offset commit for group {} failed: {}", groupId, error.message()); - coordinatorDead(); - future.raise(error); - return; - } else if (error == Errors.UNKNOWN_MEMBER_ID - || error == Errors.ILLEGAL_GENERATION - || error == Errors.REBALANCE_IN_PROGRESS) { - // need to re-join group - log.debug("Offset commit for group {} failed: {}", groupId, error.message()); - resetGeneration(); - future.raise(new CommitFailedException()); - return; - } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { - log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message()); - future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic")); - return; } else { - log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message()); - future.raise(new KafkaException("Unexpected error in commit: " + error.message())); - return; + log.error("Offset commit failed on partition {} at offset {}: {}", tp, offset, error.message()); + + if (error == Errors.GROUP_AUTHORIZATION_FAILED) { + future.raise(new GroupAuthorizationException(groupId)); + return; + } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { + unauthorizedTopics.add(tp.topic()); + } else if (error == Errors.OFFSET_METADATA_TOO_LARGE + || error == Errors.INVALID_COMMIT_OFFSET_SIZE) { + // raise the error to the user + future.raise(error); + return; + } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { + // just retry + future.raise(error); + return; + } else if (error == Errors.COORDINATOR_NOT_AVAILABLE + || error == Errors.NOT_COORDINATOR + || error == Errors.REQUEST_TIMED_OUT) { + coordinatorDead(); + future.raise(error); + return; + } else if (error == Errors.UNKNOWN_MEMBER_ID + || error == Errors.ILLEGAL_GENERATION + || error == Errors.REBALANCE_IN_PROGRESS) { + // need to re-join group + resetGeneration(); + future.raise(new CommitFailedException()); + return; + } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { + future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic")); + return; + } else { + future.raise(new KafkaException("Unexpected error in commit: " + error.message())); + return; + } } } if (!unauthorizedTopics.isEmpty()) { - log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId); + log.error("Not authorized to commit to topics {}", unauthorizedTopics); future.raise(new TopicAuthorizationException(unauthorizedTopics)); } else { future.complete(null); @@ -828,10 +818,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (coordinator == null) return RequestFuture.coordinatorNotAvailable(); - log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions); + log.debug("Fetching committed offsets for partitions: {}", partitions); // construct the request - OffsetFetchRequest.Builder requestBuilder = - new OffsetFetchRequest.Builder(this.groupId, new ArrayList<>(partitions)); + OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(this.groupId, + new ArrayList<>(partitions)); // send the request with a callback return client.send(coordinator, requestBuilder) @@ -843,7 +833,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) { if (response.hasError()) { Errors error = response.error(); - log.debug("Offset fetch for group {} failed: {}", groupId, error.message()); + log.debug("Offset fetch failed: {}", error.message()); if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { // just retry @@ -866,7 +856,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { OffsetFetchResponse.PartitionData data = entry.getValue(); if (data.hasError()) { Errors error = data.error; - log.debug("Group {} failed to fetch offset for partition {}: {}", groupId, tp, error.message()); + log.debug("Failed to fetch offset for partition {}: {}", tp, error.message()); if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { future.raise(new KafkaException("Partition " + tp + " may not exist or the user may not have " + @@ -879,7 +869,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // record the position with the offset (-1 indicates no committed offset to fetch) offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata)); } else { - log.debug("Group {} has no committed offset for partition {}", groupId, tp); + log.debug("Found no committed offset for partition {}", tp); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index bb7f9f2..4a38d04 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -16,17 +16,6 @@ */ package org.apache.kafka.clients.consumer.internals; -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.KafkaClient; @@ -40,9 +29,21 @@ import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; /** * Higher level consumer access to the network layer with basic support for request futures. This class @@ -50,11 +51,11 @@ import org.slf4j.LoggerFactory; * are held when they are invoked. */ public class ConsumerNetworkClient implements Closeable { - private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class); private static final long MAX_POLL_TIMEOUT_MS = 5000L; // the mutable state of this class is protected by the object's monitor (excluding the wakeup // flag and the request completion queue below). + private final Logger log; private final KafkaClient client; private final UnsentRequests unsent = new UnsentRequests(); private final Metadata metadata; @@ -71,11 +72,13 @@ public class ConsumerNetworkClient implements Closeable { // atomic to avoid the need to acquire the lock above in order to enable it concurrently. private final AtomicBoolean wakeup = new AtomicBoolean(false); - public ConsumerNetworkClient(KafkaClient client, + public ConsumerNetworkClient(LogContext logContext, + KafkaClient client, Metadata metadata, Time time, long retryBackoffMs, long requestTimeoutMs) { + this.log = logContext.logger(ConsumerNetworkClient.class); this.client = client; this.metadata = metadata; this.time = time; @@ -152,7 +155,7 @@ public class ConsumerNetworkClient implements Closeable { public void wakeup() { // wakeup should be safe without holding the client lock since it simply delegates to // Selector's wakeup, which is threadsafe - log.trace("Received user wakeup"); + log.debug("Received user wakeup"); this.wakeup.set(true); this.client.wakeup(); } @@ -426,7 +429,7 @@ public class ConsumerNetworkClient implements Closeable { public void maybeTriggerWakeup() { if (!wakeupDisabled.get() && wakeup.get()) { - log.trace("Raising wakeup exception in response to user wakeup"); + log.debug("Raising WakeupException in response to user wakeup"); wakeup.set(false); throw new WakeupException(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 23c5902..01db34f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -62,10 +62,10 @@ import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.ExtendedDeserializer; import org.apache.kafka.common.utils.CloseableIterator; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.Closeable; import java.nio.ByteBuffer; @@ -91,9 +91,7 @@ import static org.apache.kafka.common.serialization.ExtendedDeserializer.Wrapper * This class manage the fetching process with the brokers. */ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { - - private static final Logger log = LoggerFactory.getLogger(Fetcher.class); - + private final Logger log; private final ConsumerNetworkClient client; private final Time time; private final int minBytes; @@ -115,7 +113,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { private PartitionRecords nextInLineRecords = null; - public Fetcher(ConsumerNetworkClient client, + public Fetcher(LogContext logContext, + ConsumerNetworkClient client, int minBytes, int maxBytes, int maxWaitMs, @@ -131,6 +130,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { Time time, long retryBackoffMs, IsolationLevel isolationLevel) { + this.log = logContext.logger(Fetcher.class); this.time = time; this.client = client; this.metadata = metadata; http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/main/java/org/apache/kafka/common/utils/LogContext.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/LogContext.java b/clients/src/main/java/org/apache/kafka/common/utils/LogContext.java new file mode 100644 index 0000000..bbf51fa --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/LogContext.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; + +/** + * This class provides a way to instrument loggers with a common context which can be used to + * automatically enrich log messages. For example, in the KafkaConsumer, it is often useful to know + * the groupId of the consumer, so this can be added to a context object which can then be passed to + * all of the dependent components in order to build new loggers. This removes the need to manually + * add the groupId to each message. + */ +public class LogContext { + + private final String logPrefix; + + public LogContext(String logPrefix) { + this.logPrefix = logPrefix == null ? "" : logPrefix; + } + + public LogContext() { + this(""); + } + + public Logger logger(Class<?> clazz) { + return new KafkaLogger(clazz, logPrefix); + } + + private static class KafkaLogger implements Logger { + private final Logger logger; + private final String logPrefix; + + public KafkaLogger(Class<?> clazz, String logPrefix) { + this.logger = LoggerFactory.getLogger(clazz); + this.logPrefix = logPrefix; + } + + @Override + public String getName() { + return logger.getName(); + } + + @Override + public boolean isTraceEnabled() { + return logger.isTraceEnabled(); + } + + @Override + public boolean isTraceEnabled(Marker marker) { + return logger.isTraceEnabled(marker); + } + + @Override + public boolean isDebugEnabled() { + return logger.isDebugEnabled(); + } + + @Override + public boolean isDebugEnabled(Marker marker) { + return logger.isDebugEnabled(marker); + } + + @Override + public boolean isInfoEnabled() { + return logger.isInfoEnabled(); + } + + @Override + public boolean isInfoEnabled(Marker marker) { + return logger.isInfoEnabled(marker); + } + + @Override + public boolean isWarnEnabled() { + return logger.isWarnEnabled(); + } + + @Override + public boolean isWarnEnabled(Marker marker) { + return logger.isWarnEnabled(marker); + } + + @Override + public boolean isErrorEnabled() { + return logger.isErrorEnabled(); + } + + @Override + public boolean isErrorEnabled(Marker marker) { + return logger.isErrorEnabled(marker); + } + + @Override + public void trace(String message) { + if (logger.isTraceEnabled()) + logger.trace(logPrefix + message); + } + + @Override + public void trace(String message, Object arg) { + if (logger.isTraceEnabled()) + logger.trace(logPrefix + message, arg); + } + + @Override + public void trace(String message, Object arg1, Object arg2) { + if (logger.isTraceEnabled()) + logger.trace(logPrefix + message, arg1, arg2); + } + + @Override + public void trace(String message, Object... args) { + if (logger.isTraceEnabled()) + logger.trace(logPrefix + message, args); + } + + @Override + public void trace(String msg, Throwable t) { + if (logger.isTraceEnabled()) + logger.trace(logPrefix + msg, t); + } + + @Override + public void trace(Marker marker, String msg) { + if (logger.isTraceEnabled()) + logger.trace(marker, logPrefix + msg); + } + + @Override + public void trace(Marker marker, String format, Object arg) { + if (logger.isTraceEnabled()) + logger.trace(marker, logPrefix + format, arg); + } + + @Override + public void trace(Marker marker, String format, Object arg1, Object arg2) { + if (logger.isTraceEnabled()) + logger.trace(marker, logPrefix + format, arg1, arg2); + } + + @Override + public void trace(Marker marker, String format, Object... argArray) { + if (logger.isTraceEnabled()) + logger.trace(marker, logPrefix + format, argArray); + } + + @Override + public void trace(Marker marker, String msg, Throwable t) { + if (logger.isTraceEnabled()) + logger.trace(marker, logPrefix + msg, t); + } + + @Override + public void debug(String message) { + if (logger.isDebugEnabled()) + logger.debug(logPrefix + message); + } + + @Override + public void debug(String message, Object arg) { + if (logger.isDebugEnabled()) + logger.debug(logPrefix + message, arg); + } + + @Override + public void debug(String message, Object arg1, Object arg2) { + if (logger.isDebugEnabled()) + logger.debug(logPrefix + message, arg1, arg2); + } + + @Override + public void debug(String message, Object... args) { + if (logger.isDebugEnabled()) + logger.debug(logPrefix + message, args); + } + + @Override + public void debug(String msg, Throwable t) { + if (logger.isDebugEnabled()) + logger.debug(logPrefix + msg, t); + } + + @Override + public void debug(Marker marker, String msg) { + if (logger.isDebugEnabled()) + logger.debug(marker, logPrefix + msg); + } + + @Override + public void debug(Marker marker, String format, Object arg) { + if (logger.isDebugEnabled()) + logger.debug(marker, logPrefix + format, arg); + } + + @Override + public void debug(Marker marker, String format, Object arg1, Object arg2) { + if (logger.isDebugEnabled()) + logger.debug(marker, logPrefix + format, arg1, arg2); + } + + @Override + public void debug(Marker marker, String format, Object... arguments) { + if (logger.isDebugEnabled()) + logger.debug(marker, logPrefix + format, arguments); + } + + @Override + public void debug(Marker marker, String msg, Throwable t) { + if (logger.isDebugEnabled()) + logger.debug(marker, logPrefix + msg, t); + } + + @Override + public void warn(String message) { + logger.warn(logPrefix + message); + } + + @Override + public void warn(String message, Object arg) { + logger.warn(logPrefix + message, arg); + } + + @Override + public void warn(String message, Object arg1, Object arg2) { + logger.warn(logPrefix + message, arg1, arg2); + } + + @Override + public void warn(String message, Object... args) { + logger.warn(logPrefix + message, args); + } + + @Override + public void warn(String msg, Throwable t) { + logger.warn(logPrefix + msg, t); + } + + @Override + public void warn(Marker marker, String msg) { + logger.warn(marker, logPrefix + msg); + } + + @Override + public void warn(Marker marker, String format, Object arg) { + logger.warn(marker, logPrefix + format, arg); + } + + @Override + public void warn(Marker marker, String format, Object arg1, Object arg2) { + logger.warn(marker, logPrefix + format, arg1, arg2); + } + + @Override + public void warn(Marker marker, String format, Object... arguments) { + logger.warn(marker, logPrefix + format, arguments); + } + + @Override + public void warn(Marker marker, String msg, Throwable t) { + logger.warn(marker, logPrefix + msg, t); + } + + @Override + public void error(String message) { + logger.error(logPrefix + message); + } + + @Override + public void error(String message, Object arg) { + logger.error(logPrefix + message, arg); + } + + @Override + public void error(String message, Object arg1, Object arg2) { + logger.error(logPrefix + message, arg1, arg2); + } + + @Override + public void error(String message, Object... args) { + logger.error(logPrefix + message, args); + } + + @Override + public void error(String msg, Throwable t) { + logger.error(logPrefix + msg, t); + } + + @Override + public void error(Marker marker, String msg) { + logger.error(marker, logPrefix + msg); + } + + @Override + public void error(Marker marker, String format, Object arg) { + logger.error(marker, logPrefix + format, arg); + } + + @Override + public void error(Marker marker, String format, Object arg1, Object arg2) { + logger.error(marker, logPrefix + format, arg1, arg2); + } + + @Override + public void error(Marker marker, String format, Object... arguments) { + logger.error(marker, logPrefix + format, arguments); + } + + @Override + public void error(Marker marker, String msg, Throwable t) { + logger.error(marker, logPrefix + msg, t); + } + + @Override + public void info(String message) { + logger.info(logPrefix + message); + } + + @Override + public void info(String message, Object arg) { + logger.info(logPrefix + message, arg); + } + + @Override + public void info(String message, Object arg1, Object arg2) { + logger.info(logPrefix + message, arg1, arg2); + } + + @Override + public void info(String message, Object... args) { + logger.info(logPrefix + message, args); + } + + @Override + public void info(String msg, Throwable t) { + logger.info(logPrefix + msg, t); + } + + @Override + public void info(Marker marker, String msg) { + logger.info(marker, logPrefix + msg); + } + + @Override + public void info(Marker marker, String format, Object arg) { + logger.info(marker, logPrefix + format, arg); + } + + @Override + public void info(Marker marker, String format, Object arg1, Object arg2) { + logger.info(marker, logPrefix + format, arg1, arg2); + } + + @Override + public void info(Marker marker, String format, Object... arguments) { + logger.info(marker, logPrefix + format, arguments); + } + + @Override + public void info(Marker marker, String msg, Throwable t) { + logger.info(marker, logPrefix + msg, t); + } + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 9fd7e19..eed012e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -63,6 +63,7 @@ import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -1621,8 +1622,11 @@ public class KafkaConsumerTest { ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix); SubscriptionState subscriptions = new SubscriptionState(autoResetStrategy); - ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, retryBackoffMs, requestTimeoutMs); + LogContext loggerFactory = new LogContext(); + ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time, + retryBackoffMs, requestTimeoutMs); ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator( + loggerFactory, consumerClient, groupId, rebalanceTimeoutMs, @@ -1642,6 +1646,7 @@ public class KafkaConsumerTest { true); Fetcher<String, String> fetcher = new Fetcher<>( + loggerFactory, consumerClient, minBytes, maxBytes, @@ -1660,6 +1665,7 @@ public class KafkaConsumerTest { IsolationLevel.READ_UNCOMMITTED); return new KafkaConsumer<>( + loggerFactory, clientId, consumerCoordinator, keyDeserializer, @@ -1672,8 +1678,7 @@ public class KafkaConsumerTest { subscriptions, metadata, retryBackoffMs, - requestTimeoutMs - ); + requestTimeoutMs); } private static class FetchInfo { http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 637c832..f3982c0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.TestCondition; @@ -75,7 +76,7 @@ public class AbstractCoordinatorTest { this.mockClient = new MockClient(mockTime); Metadata metadata = new Metadata(100L, 60 * 60 * 1000L, true); - this.consumerClient = new ConsumerNetworkClient(mockClient, metadata, mockTime, + this.consumerClient = new ConsumerNetworkClient(new LogContext(), mockClient, metadata, mockTime, retryBackoffMs, REQUEST_TIMEOUT_MS); Metrics metrics = new Metrics(); @@ -568,8 +569,8 @@ public class AbstractCoordinatorTest { public DummyCoordinator(ConsumerNetworkClient client, Metrics metrics, Time time) { - super(client, GROUP_ID, REBALANCE_TIMEOUT_MS, SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, - METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS, false); + super(new LogContext(), client, GROUP_ID, REBALANCE_TIMEOUT_MS, SESSION_TIMEOUT_MS, + HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS, false); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 18e18ed..65de8c0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -51,6 +51,7 @@ import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -123,7 +124,7 @@ public class ConsumerCoordinatorTest { this.metadata = new Metadata(0, Long.MAX_VALUE, true); this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); this.client = new MockClient(time, metadata); - this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); + this.consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100, 1000); this.metrics = new Metrics(time); this.rebalanceListener = new MockRebalanceListener(); this.mockOffsetCommitCallback = new MockCommitCallback(); @@ -1664,6 +1665,7 @@ public class ConsumerCoordinatorTest { final boolean autoCommitEnabled, final boolean leaveGroup) { return new ConsumerCoordinator( + new LogContext(), consumerClient, groupId, rebalanceTimeoutMs, http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index 8e71dd5..3ed8e3d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.HeartbeatRequest; import org.apache.kafka.common.requests.HeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; @@ -48,7 +49,8 @@ public class ConsumerNetworkClientTest { private Cluster cluster = TestUtils.singletonCluster(topicName, 1); private Node node = cluster.nodes().get(0); private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); - private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); + private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), + client, metadata, time, 100, 1000); @Test public void send() { @@ -105,7 +107,8 @@ public class ConsumerNetworkClientTest { @Test public void doNotBlockIfPollConditionIsSatisfied() { NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class); - ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(mockNetworkClient, metadata, time, 100, 1000); + ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), + mockNetworkClient, metadata, time, 100, 1000); // expect poll, but with no timeout EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(0L), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList()); @@ -127,7 +130,8 @@ public class ConsumerNetworkClientTest { long timeout = 4000L; NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class); - ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(mockNetworkClient, metadata, time, 100, 1000); + ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), + mockNetworkClient, metadata, time, 100, 1000); EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(1); EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(timeout), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList()); @@ -149,7 +153,8 @@ public class ConsumerNetworkClientTest { long retryBackoffMs = 100L; NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class); - ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(mockNetworkClient, metadata, time, retryBackoffMs, 1000L); + ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), + mockNetworkClient, metadata, time, retryBackoffMs, 1000L); EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(0); EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(retryBackoffMs), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList()); @@ -205,7 +210,7 @@ public class ConsumerNetworkClientTest { } }; // Queue first send, sleep long enough for this to expire and then queue second send - consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, unsentExpiryMs); + consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100, unsentExpiryMs); RequestFuture<ClientResponse> future1 = consumerClient.send(node, heartbeat()); assertEquals(1, consumerClient.pendingRequestCount()); assertEquals(1, consumerClient.pendingRequestCount(node)); http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index b747677..5fe4369 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -75,6 +75,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.DelayedReceive; @@ -129,7 +130,8 @@ public class FetcherTest { private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE); private static final double EPSILON = 0.0001; - private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); + private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), + client, metadata, time, 100, 1000); private MemoryRecords records; private MemoryRecords nextRecords; @@ -2038,7 +2040,9 @@ public class FetcherTest { Deserializer<V> valueDeserializer, int maxPollRecords, IsolationLevel isolationLevel) { - return new Fetcher<>(consumerClient, + return new Fetcher<>( + new LogContext(), + consumerClient, minBytes, maxBytes, maxWaitMs, http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 0252ae9..6007728 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -24,11 +24,11 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata; import org.apache.kafka.common.utils.CircularIterator; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.Closeable; import java.nio.ByteBuffer; @@ -44,11 +44,10 @@ import java.util.Map; * to workers. */ public final class WorkerCoordinator extends AbstractCoordinator implements Closeable { - private static final Logger log = LoggerFactory.getLogger(WorkerCoordinator.class); - // Currently doesn't support multiple task assignment strategies, so we just fill in a default value public static final String DEFAULT_SUBPROTOCOL = "default"; + private final Logger log; private final String restUrl; private final ConfigBackingStore configStorage; private ConnectProtocol.Assignment assignmentSnapshot; @@ -61,7 +60,8 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos /** * Initialize the coordination manager. */ - public WorkerCoordinator(ConsumerNetworkClient client, + public WorkerCoordinator(LogContext logContext, + ConsumerNetworkClient client, String groupId, int rebalanceTimeoutMs, int sessionTimeoutMs, @@ -73,7 +73,8 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos String restUrl, ConfigBackingStore configStorage, WorkerRebalanceListener listener) { - super(client, + super(logContext, + client, groupId, rebalanceTimeoutMs, sessionTimeoutMs, @@ -83,6 +84,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos time, retryBackoffMs, true); + this.log = logContext.logger(WorkerCoordinator.class); this.restUrl = restUrl; this.configStorage = configStorage; this.assignmentSnapshot = null;