MINOR: Ensure consumer logging has clientId/groupId context

This patch ensures that the consumer groupId and clientId are available in all 
log messages which makes debugging much easier when a single application has 
multiple consumer instances. To make this easier, I've added a new `LogContext` 
object which builds a log prefix similar to the broker-side 
`kafka.utils.Logging` mixin. Additionally this patch changes the log level for 
a couple minor cases:

- Consumer wakeup events are now logged at DEBUG instead of TRACE
- Heartbeat enabling/disabling is now logged at DEBUG instead of TRACE

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Manikumar Reddy <manikumar.re...@gmail.com>, Ismael Juma 
<ism...@juma.me.uk>

Closes #3676 from hachikuji/log-consumer-wakeups


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6896f1dd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6896f1dd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6896f1dd

Branch: refs/heads/trunk
Commit: 6896f1ddb7650f42630aef8c67c8b61866e9fc00
Parents: ed96523
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sat Aug 19 11:17:02 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Sat Aug 19 11:17:02 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  81 ++--
 .../consumer/internals/AbstractCoordinator.java |  84 ++--
 .../consumer/internals/ConsumerCoordinator.java | 148 ++++---
 .../internals/ConsumerNetworkClient.java        |  35 +-
 .../clients/consumer/internals/Fetcher.java     |  10 +-
 .../apache/kafka/common/utils/LogContext.java   | 381 +++++++++++++++++++
 .../clients/consumer/KafkaConsumerTest.java     |  11 +-
 .../internals/AbstractCoordinatorTest.java      |   7 +-
 .../internals/ConsumerCoordinatorTest.java      |   4 +-
 .../internals/ConsumerNetworkClientTest.java    |  15 +-
 .../clients/consumer/internals/FetcherTest.java |   8 +-
 .../runtime/distributed/WorkerCoordinator.java  |  12 +-
 .../runtime/distributed/WorkerGroupMember.java  |  24 +-
 .../distributed/WorkerCoordinatorTest.java      |   9 +-
 .../main/scala/kafka/admin/AdminClient.scala    |   3 +-
 15 files changed, 628 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f1351b7..073b2df 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -49,10 +49,10 @@ import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.Collection;
@@ -541,7 +541,6 @@ import java.util.regex.Pattern;
  */
 public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
-    private static final Logger log = 
LoggerFactory.getLogger(KafkaConsumer.class);
     private static final long NO_CURRENT_THREAD = -1L;
     private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
     private static final String JMX_PREFIX = "kafka.consumer";
@@ -550,6 +549,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     // Visible for testing
     final Metrics metrics;
 
+    private final Logger log;
     private final String clientId;
     private final ConsumerCoordinator coordinator;
     private final Deserializer<K> keyDeserializer;
@@ -640,7 +640,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> 
{
                           Deserializer<K> keyDeserializer,
                           Deserializer<V> valueDeserializer) {
         try {
-            log.debug("Starting the Kafka consumer");
+            String clientId = 
config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
+            if (clientId.isEmpty())
+                clientId = "consumer-" + 
CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
+            this.clientId = clientId;
+            String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
+
+            LogContext logContext = new LogContext("[Consumer clientId=" + 
clientId + ", groupId=" + groupId + "] ");
+            this.log = logContext.logger(getClass());
+
+            log.debug("Initializing the Kafka consumer");
             this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
             int sessionTimeOutMs = 
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
             int fetchMaxWaitMs = 
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
@@ -648,10 +657,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> 
{
                 throw new 
ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater 
than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + 
ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
             this.time = Time.SYSTEM;
 
-            String clientId = 
config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
-            if (clientId.length() <= 0)
-                clientId = "consumer-" + 
CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
-            this.clientId = clientId;
             Map<String, String> metricsTags = 
Collections.singletonMap("client-id", clientId);
             MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS)
@@ -712,31 +717,39 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
                     true,
                     new ApiVersions(),
                     throttleTimeSensor);
-            this.client = new ConsumerNetworkClient(netClient, metadata, time, 
retryBackoffMs,
+            this.client = new ConsumerNetworkClient(
+                    logContext,
+                    netClient,
+                    metadata,
+                    time,
+                    retryBackoffMs,
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
             OffsetResetStrategy offsetResetStrategy = 
OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
             this.subscriptions = new SubscriptionState(offsetResetStrategy);
             List<PartitionAssignor> assignors = config.getConfiguredInstances(
                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                     PartitionAssignor.class);
-            this.coordinator = new ConsumerCoordinator(this.client,
-                                                       
config.getString(ConsumerConfig.GROUP_ID_CONFIG),
-                                                       
config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
-                                                       
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
-                                                       
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
-                                                       assignors,
-                                                       this.metadata,
-                                                       this.subscriptions,
-                                                       metrics,
-                                                       metricGrpPrefix,
-                                                       this.time,
-                                                       retryBackoffMs,
-                                                       
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
-                                                       
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
-                                                       this.interceptors,
-                                                       
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
-                                                       
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
-            this.fetcher = new Fetcher<>(this.client,
+            this.coordinator = new ConsumerCoordinator(logContext,
+                    this.client,
+                    groupId,
+                    config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
+                    config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
+                    config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
+                    assignors,
+                    this.metadata,
+                    this.subscriptions,
+                    metrics,
+                    metricGrpPrefix,
+                    this.time,
+                    retryBackoffMs,
+                    
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
+                    
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
+                    this.interceptors,
+                    
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
+                    
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
+            this.fetcher = new Fetcher<>(
+                    logContext,
+                    this.client,
                     config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
                     config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
                     config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
@@ -756,7 +769,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
 
-            log.debug("Kafka consumer with client id {} created", clientId);
+            log.debug("Kafka consumer initialized");
         } catch (Throwable t) {
             // call close methods if internal objects are already constructed
             // this is to prevent resource leak. see KAFKA-2121
@@ -767,7 +780,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     // visible for testing
-    KafkaConsumer(String clientId,
+    KafkaConsumer(LogContext logContext,
+                  String clientId,
                   ConsumerCoordinator coordinator,
                   Deserializer<K> keyDeserializer,
                   Deserializer<V> valueDeserializer,
@@ -780,6 +794,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                   Metadata metadata,
                   long retryBackoffMs,
                   long requestTimeoutMs) {
+        this.log = logContext.logger(getClass());
         this.clientId = clientId;
         this.coordinator = coordinator;
         this.keyDeserializer = keyDeserializer;
@@ -1242,7 +1257,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
     public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> 
offsets, OffsetCommitCallback callback) {
         acquireAndEnsureOpen();
         try {
-            log.debug("Committing offsets: {} ", offsets);
+            log.debug("Committing offsets: {}", offsets);
             coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
         } finally {
             release();
@@ -1440,8 +1455,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
     public void pause(Collection<TopicPartition> partitions) {
         acquireAndEnsureOpen();
         try {
+            log.debug("Pausing partitions {}", partitions);
             for (TopicPartition partition: partitions) {
-                log.debug("Pausing partition {}", partition);
                 subscriptions.pause(partition);
             }
         } finally {
@@ -1459,8 +1474,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
     public void resume(Collection<TopicPartition> partitions) {
         acquireAndEnsureOpen();
         try {
+            log.debug("Resuming partitions {}", partitions);
             for (TopicPartition partition: partitions) {
-                log.debug("Resuming partition {}", partition);
                 subscriptions.resume(partition);
             }
         } finally {
@@ -1630,7 +1645,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
     }
 
     private void close(long timeoutMs, boolean swallowException) {
-        log.trace("Closing the Kafka consumer.");
+        log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
         try {
             if (coordinator != null)
@@ -1646,7 +1661,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
         ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", 
firstException);
         ClientUtils.closeQuietly(valueDeserializer, "consumer value 
deserializer", firstException);
         AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
-        log.debug("Kafka consumer with client id {} has been closed", 
clientId);
+        log.debug("Kafka consumer has been closed");
         Throwable exception = firstException.get();
         if (exception != null && !swallowException) {
             if (exception instanceof InterruptException) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 74ef20a..dcf837b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -48,9 +48,9 @@ import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.nio.ByteBuffer;
@@ -90,8 +90,6 @@ import java.util.concurrent.atomic.AtomicReference;
  * when sending a request that affects the state of the group (e.g. JoinGroup, 
LeaveGroup).
  */
 public abstract class AbstractCoordinator implements Closeable {
-
-    private static final Logger log = 
LoggerFactory.getLogger(AbstractCoordinator.class);
     public static final String HEARTBEAT_THREAD_PREFIX = 
"kafka-coordinator-heartbeat-thread";
 
     private enum MemberState {
@@ -100,11 +98,12 @@ public abstract class AbstractCoordinator implements 
Closeable {
         STABLE,      // the client has joined and is sending heartbeats
     }
 
-    protected final int rebalanceTimeoutMs;
+    private final Logger log;
     private final int sessionTimeoutMs;
     private final boolean leaveGroupOnClose;
     private final GroupCoordinatorMetrics sensors;
     private final Heartbeat heartbeat;
+    protected final int rebalanceTimeoutMs;
     protected final String groupId;
     protected final ConsumerNetworkClient client;
     protected final Time time;
@@ -123,7 +122,8 @@ public abstract class AbstractCoordinator implements 
Closeable {
     /**
      * Initialize the coordination manager.
      */
-    public AbstractCoordinator(ConsumerNetworkClient client,
+    public AbstractCoordinator(LogContext logContext,
+                               ConsumerNetworkClient client,
                                String groupId,
                                int rebalanceTimeoutMs,
                                int sessionTimeoutMs,
@@ -133,6 +133,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                                Time time,
                                long retryBackoffMs,
                                boolean leaveGroupOnClose) {
+        this.log = logContext.logger(AbstractCoordinator.class);
         this.client = client;
         this.time = time;
         this.groupId = groupId;
@@ -222,7 +223,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                     if (remainingMs <= 0)
                         break;
 
-                    log.debug("Coordinator discovery failed for group {}, 
refreshing metadata", groupId);
+                    log.debug("Coordinator discovery failed, refreshing 
metadata");
                     client.awaitMetadataUpdate(remainingMs);
                 } else
                     throw future.exception();
@@ -246,7 +247,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
             // find a node to ask about the coordinator
             Node node = this.client.leastLoadedNode();
             if (node == null) {
-                log.debug("No broker available to send GroupCoordinator 
request for group {}", groupId);
+                log.debug("No broker available to send GroupCoordinator 
request");
                 return RequestFuture.noBrokersAvailable();
             } else
                 findCoordinatorFuture = sendGroupCoordinatorRequest(node);
@@ -404,7 +405,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                     // handle join completion in the callback so that the 
callback will be invoked
                     // even if the consumer is woken up before finishing the 
rebalance
                     synchronized (AbstractCoordinator.this) {
-                        log.info("Successfully joined group {} with generation 
{}", groupId, generation.generationId);
+                        log.info("Successfully joined group with generation 
{}", generation.generationId);
                         state = MemberState.STABLE;
                         rejoinNeeded = false;
 
@@ -437,7 +438,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
             return RequestFuture.coordinatorNotAvailable();
 
         // send a join group request to the coordinator
-        log.info("(Re-)joining group {}", groupId);
+        log.info("(Re-)joining group");
         JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
                 groupId,
                 this.sessionTimeoutMs,
@@ -455,7 +456,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         public void handle(JoinGroupResponse joinResponse, 
RequestFuture<ByteBuffer> future) {
             Errors error = joinResponse.error();
             if (error == Errors.NONE) {
-                log.debug("Received successful JoinGroup response for group 
{}: {}", groupId, joinResponse);
+                log.debug("Received successful JoinGroup response: {}", 
joinResponse);
                 sensors.joinLatency.record(response.requestLatencyMs());
 
                 synchronized (AbstractCoordinator.this) {
@@ -474,26 +475,25 @@ public abstract class AbstractCoordinator implements 
Closeable {
                     }
                 }
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
-                log.debug("Attempt to join group {} rejected since coordinator 
{} is loading the group.", groupId,
-                        coordinator());
+                log.debug("Attempt to join group rejected since coordinator {} 
is loading the group.", coordinator());
                 // backoff and retry
                 future.raise(error);
             } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                 // reset the member id and retry immediately
                 resetGeneration();
-                log.debug("Attempt to join group {} failed due to unknown 
member id.", groupId);
+                log.debug("Attempt to join group failed due to unknown member 
id.");
                 future.raise(Errors.UNKNOWN_MEMBER_ID);
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                     || error == Errors.NOT_COORDINATOR) {
                 // re-discover the coordinator and retry with backoff
                 coordinatorDead();
-                log.debug("Attempt to join group {} failed due to obsolete 
coordinator information: {}", groupId, error.message());
+                log.debug("Attempt to join group failed due to obsolete 
coordinator information: {}", error.message());
                 future.raise(error);
             } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
                     || error == Errors.INVALID_SESSION_TIMEOUT
                     || error == Errors.INVALID_GROUP_ID) {
                 // log the error and re-throw the exception
-                log.error("Attempt to join group {} failed due to fatal error: 
{}", groupId, error.message());
+                log.error("Attempt to join group failed due to fatal error: 
{}", error.message());
                 future.raise(error);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                 future.raise(new GroupAuthorizationException(groupId));
@@ -509,8 +509,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         SyncGroupRequest.Builder requestBuilder =
                 new SyncGroupRequest.Builder(groupId, generation.generationId, 
generation.memberId,
                         Collections.<String, ByteBuffer>emptyMap());
-        log.debug("Sending follower SyncGroup for group {} to coordinator {}: 
{}", groupId, this.coordinator,
-                requestBuilder);
+        log.debug("Sending follower SyncGroup to coordinator {}: {}", 
this.coordinator, requestBuilder);
         return sendSyncGroupRequest(requestBuilder);
     }
 
@@ -522,8 +521,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
 
             SyncGroupRequest.Builder requestBuilder =
                     new SyncGroupRequest.Builder(groupId, 
generation.generationId, generation.memberId, groupAssignment);
-            log.debug("Sending leader SyncGroup for group {} to coordinator 
{}: {}",
-                    groupId, this.coordinator, requestBuilder);
+            log.debug("Sending leader SyncGroup to coordinator {}: {}", 
this.coordinator, requestBuilder);
             return sendSyncGroupRequest(requestBuilder);
         } catch (RuntimeException e) {
             return RequestFuture.failure(e);
@@ -551,16 +549,16 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                     future.raise(new GroupAuthorizationException(groupId));
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-                    log.debug("SyncGroup for group {} failed due to 
coordinator rebalance", groupId);
+                    log.debug("SyncGroup failed due to group rebalance");
                     future.raise(error);
                 } else if (error == Errors.UNKNOWN_MEMBER_ID
                         || error == Errors.ILLEGAL_GENERATION) {
-                    log.debug("SyncGroup for group {} failed due to {}", 
groupId, error);
+                    log.debug("SyncGroup failed: {}", error.message());
                     resetGeneration();
                     future.raise(error);
                 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                         || error == Errors.NOT_COORDINATOR) {
-                    log.debug("SyncGroup for group {} failed due to {}", 
groupId, error);
+                    log.debug("SyncGroup failed:", error.message());
                     coordinatorDead();
                     future.raise(error);
                 } else {
@@ -577,7 +575,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
      */
     private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
         // initiate the group metadata request
-        log.debug("Sending GroupCoordinator request for group {} to broker 
{}", groupId, node);
+        log.debug("Sending GroupCoordinator request to broker {}", node);
         FindCoordinatorRequest.Builder requestBuilder =
                 new 
FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, 
this.groupId);
         return client.send(node, requestBuilder)
@@ -588,7 +586,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
 
         @Override
         public void onSuccess(ClientResponse resp, RequestFuture<Void> future) 
{
-            log.debug("Received GroupCoordinator response {} for group {}", 
resp, groupId);
+            log.debug("Received GroupCoordinator response {}", resp);
 
             FindCoordinatorResponse findCoordinatorResponse = 
(FindCoordinatorResponse) resp.responseBody();
             // use MAX_VALUE - node.id as the coordinator id to mimic separate 
connections
@@ -602,7 +600,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                             Integer.MAX_VALUE - 
findCoordinatorResponse.node().id(),
                             findCoordinatorResponse.node().host(),
                             findCoordinatorResponse.node().port());
-                    log.info("Discovered coordinator {} for group {}.", 
coordinator, groupId);
+                    log.info("Discovered coordinator {}", coordinator);
                     client.tryConnect(coordinator);
                     heartbeat.resetTimeouts(time.milliseconds());
                 }
@@ -610,7 +608,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                 future.raise(new GroupAuthorizationException(groupId));
             } else {
-                log.debug("Group coordinator lookup for group {} failed: {}", 
groupId, error.message());
+                log.debug("Group coordinator lookup failed: {}", 
error.message());
                 future.raise(error);
             }
         }
@@ -647,7 +645,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
      */
     protected synchronized void coordinatorDead() {
         if (this.coordinator != null) {
-            log.info("Marking the coordinator {} dead for group {}", 
this.coordinator, groupId);
+            log.info("Marking the coordinator {} dead", this.coordinator);
 
             // Disconnect from the coordinator to ensure that there are no 
in-flight requests remaining.
             // Pending callbacks will be invoked with a DisconnectException.
@@ -705,8 +703,8 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 // If coordinator is not known, requests are aborted.
                 Node coordinator = coordinator();
                 if (coordinator != null && 
!client.awaitPendingRequests(coordinator, timeoutMs))
-                    log.warn("Close timed out with {} pending requests to 
coordinator, terminating client connections for group {}.",
-                            client.pendingRequestCount(coordinator), groupId);
+                    log.warn("Close timed out with {} pending requests to 
coordinator, terminating client connections",
+                            client.pendingRequestCount(coordinator));
             }
         }
     }
@@ -718,7 +716,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         if (!coordinatorUnknown() && state != MemberState.UNJOINED && 
generation != Generation.NO_GENERATION) {
             // this is a minimal effort attempt to leave the group. we do not
             // attempt any resending if the request fails or times out.
-            log.debug("Sending LeaveGroup request to coordinator {} for group 
{}", coordinator, groupId);
+            log.debug("Sending LeaveGroup request to coordinator {}", 
coordinator);
             LeaveGroupRequest.Builder request =
                     new LeaveGroupRequest.Builder(groupId, 
generation.memberId);
             client.send(coordinator, request)
@@ -734,10 +732,10 @@ public abstract class AbstractCoordinator implements 
Closeable {
         public void handle(LeaveGroupResponse leaveResponse, 
RequestFuture<Void> future) {
             Errors error = leaveResponse.error();
             if (error == Errors.NONE) {
-                log.debug("LeaveGroup request for group {} returned 
successfully", groupId);
+                log.debug("LeaveGroup request returned successfully");
                 future.complete(null);
             } else {
-                log.debug("LeaveGroup request for group {} failed with error: 
{}", groupId, error.message());
+                log.debug("LeaveGroup request failed with error: {}", 
error.message());
                 future.raise(error);
             }
         }
@@ -745,7 +743,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
 
     // visible for testing
     synchronized RequestFuture<Void> sendHeartbeatRequest() {
-        log.debug("Sending Heartbeat request for group {} to coordinator {}", 
groupId, coordinator);
+        log.debug("Sending Heartbeat request to coordinator {}", coordinator);
         HeartbeatRequest.Builder requestBuilder =
                 new HeartbeatRequest.Builder(this.groupId, 
this.generation.generationId, this.generation.memberId);
         return client.send(coordinator, requestBuilder)
@@ -758,24 +756,24 @@ public abstract class AbstractCoordinator implements 
Closeable {
             sensors.heartbeatLatency.record(response.requestLatencyMs());
             Errors error = heartbeatResponse.error();
             if (error == Errors.NONE) {
-                log.debug("Received successful Heartbeat response for group 
{}", groupId);
+                log.debug("Received successful Heartbeat response");
                 future.complete(null);
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                     || error == Errors.NOT_COORDINATOR) {
-                log.debug("Attempt to heartbeat failed for group {} since 
coordinator {} is either not started or not valid.",
-                        groupId, coordinator());
+                log.debug("Attempt to heartbeat since coordinator {} is either 
not started or not valid.",
+                        coordinator());
                 coordinatorDead();
                 future.raise(error);
             } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-                log.debug("Attempt to heartbeat failed for group {} since it 
is rebalancing.", groupId);
+                log.debug("Attempt to heartbeat failed since group is 
rebalancing");
                 requestRejoin();
                 future.raise(Errors.REBALANCE_IN_PROGRESS);
             } else if (error == Errors.ILLEGAL_GENERATION) {
-                log.debug("Attempt to heartbeat failed for group {} since 
generation id is not legal.", groupId);
+                log.debug("Attempt to heartbeat failed since generation {} is 
not current", generation.generationId);
                 resetGeneration();
                 future.raise(Errors.ILLEGAL_GENERATION);
             } else if (error == Errors.UNKNOWN_MEMBER_ID) {
-                log.debug("Attempt to heartbeat failed for group {} since 
member id is not valid.", groupId);
+                log.debug("Attempt to heartbeat failed for since member id {} 
is not valid.", generation.memberId);
                 resetGeneration();
                 future.raise(Errors.UNKNOWN_MEMBER_ID);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
@@ -879,7 +877,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
 
         public void enable() {
             synchronized (AbstractCoordinator.this) {
-                log.trace("Enabling heartbeat thread for group {}", groupId);
+                log.debug("Enabling heartbeat thread");
                 this.enabled = true;
                 heartbeat.resetTimeouts(time.milliseconds());
                 AbstractCoordinator.this.notify();
@@ -888,7 +886,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
 
         public void disable() {
             synchronized (AbstractCoordinator.this) {
-                log.trace("Disabling heartbeat thread for group {}", groupId);
+                log.debug("Disabling heartbeat thread");
                 this.enabled = false;
             }
         }
@@ -911,7 +909,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         @Override
         public void run() {
             try {
-                log.debug("Heartbeat thread for group {} started", groupId);
+                log.debug("Heartbeat thread started");
                 while (true) {
                     synchronized (AbstractCoordinator.this) {
                         if (closed)
@@ -989,7 +987,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 log.error("Heartbeat thread for group {} failed due to 
unexpected error", groupId, e);
                 this.failed.set(e);
             } finally {
-                log.debug("Heartbeat thread for group {} has closed", groupId);
+                log.debug("Heartbeat thread has closed");
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 5ba9ccb..3a80d06 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -47,9 +47,9 @@ import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -66,9 +66,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * This class manages the coordination process with the consumer coordinator.
  */
 public final class ConsumerCoordinator extends AbstractCoordinator {
-
-    private static final Logger log = 
LoggerFactory.getLogger(ConsumerCoordinator.class);
-
+    private final Logger log;
     private final List<PartitionAssignor> assignors;
     private final Metadata metadata;
     private final ConsumerCoordinatorMetrics sensors;
@@ -93,7 +91,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     /**
      * Initialize the coordination manager.
      */
-    public ConsumerCoordinator(ConsumerNetworkClient client,
+    public ConsumerCoordinator(LogContext logContext,
+                               ConsumerNetworkClient client,
                                String groupId,
                                int rebalanceTimeoutMs,
                                int sessionTimeoutMs,
@@ -110,7 +109,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                                ConsumerInterceptors<?, ?> interceptors,
                                boolean excludeInternalTopics,
                                final boolean leaveGroupOnClose) {
-        super(client,
+        super(logContext,
+              client,
               groupId,
               rebalanceTimeoutMs,
               sessionTimeoutMs,
@@ -120,6 +120,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
               time,
               retryBackoffMs,
               leaveGroupOnClose);
+        this.log = logContext.logger(ConsumerCoordinator.class);
         this.metadata = metadata;
         this.metadataSnapshot = new MetadataSnapshot(subscriptions, 
metadata.fetch());
         this.subscriptions = subscriptions;
@@ -259,15 +260,14 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
         // execute the user's callback after rebalance
         ConsumerRebalanceListener listener = subscriptions.listener();
-        log.info("Setting newly assigned partitions {} for group {}", 
subscriptions.assignedPartitions(), groupId);
+        log.info("Setting newly assigned partitions {}", 
subscriptions.assignedPartitions());
         try {
             Set<TopicPartition> assigned = new 
HashSet<>(subscriptions.assignedPartitions());
             listener.onPartitionsAssigned(assigned);
         } catch (WakeupException | InterruptException e) {
             throw e;
         } catch (Exception e) {
-            log.error("User provided listener {} for group {} failed on 
partition assignment",
-                    listener.getClass().getName(), groupId, e);
+            log.error("User provided listener {} failed on partition 
assignment", listener.getClass().getName(), e);
         }
     }
 
@@ -359,8 +359,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
         isLeader = true;
 
-        log.debug("Performing assignment for group {} using strategy {} with 
subscriptions {}",
-                groupId, assignor.name(), subscriptions);
+        log.debug("Performing assignment using strategy {} with subscriptions 
{}", assignor.name(), subscriptions);
 
         Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), 
subscriptions);
 
@@ -380,15 +379,14 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         if (!assignedTopics.containsAll(allSubscribedTopics)) {
             Set<String> notAssignedTopics = new HashSet<>(allSubscribedTopics);
             notAssignedTopics.removeAll(assignedTopics);
-            log.warn("The following subscribed topics are not assigned to any 
members in the group {} : {} ", groupId,
-                    notAssignedTopics);
+            log.warn("The following subscribed topics are not assigned to any 
members: {} ", notAssignedTopics);
         }
 
         if (!allSubscribedTopics.containsAll(assignedTopics)) {
             Set<String> newlyAddedTopics = new HashSet<>(assignedTopics);
             newlyAddedTopics.removeAll(allSubscribedTopics);
-            log.info("The following not-subscribed topics are assigned to 
group {}, and their metadata will be " +
-                    "fetched from the brokers : {}", groupId, 
newlyAddedTopics);
+            log.info("The following not-subscribed topics are assigned, and 
their metadata will be " +
+                    "fetched from the brokers: {}", newlyAddedTopics);
 
             allSubscribedTopics.addAll(assignedTopics);
             this.subscriptions.groupSubscribe(allSubscribedTopics);
@@ -398,7 +396,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
         assignmentSnapshot = metadataSnapshot;
 
-        log.debug("Finished assignment for group {}: {}", groupId, assignment);
+        log.debug("Finished assignment for group: {}", assignment);
 
         Map<String, ByteBuffer> groupAssignment = new HashMap<>();
         for (Map.Entry<String, Assignment> assignmentEntry : 
assignment.entrySet()) {
@@ -416,15 +414,14 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
         // execute the user's callback before rebalance
         ConsumerRebalanceListener listener = subscriptions.listener();
-        log.info("Revoking previously assigned partitions {} for group {}", 
subscriptions.assignedPartitions(), groupId);
+        log.info("Revoking previously assigned partitions {}", 
subscriptions.assignedPartitions());
         try {
             Set<TopicPartition> revoked = new 
HashSet<>(subscriptions.assignedPartitions());
             listener.onPartitionsRevoked(revoked);
         } catch (WakeupException | InterruptException e) {
             throw e;
         } catch (Exception e) {
-            log.error("User provided listener {} for group {} failed on 
partition revocation",
-                    listener.getClass().getName(), groupId, e);
+            log.error("User provided listener {} failed on partition 
revocation", listener.getClass().getName(), e);
         }
 
         isLeader = false;
@@ -645,18 +642,17 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
     private void doAutoCommitOffsetsAsync() {
         Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = 
subscriptions.allConsumed();
-        log.debug("Sending asynchronous auto-commit of offsets {} for group 
{}", allConsumedOffsets, groupId);
+        log.debug("Sending asynchronous auto-commit of offsets {}", 
allConsumedOffsets);
 
         commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() {
             @Override
             public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception exception) {
                 if (exception != null) {
-                    log.warn("Auto-commit of offsets {} failed for group {}: 
{}", offsets, groupId,
-                            exception.getMessage());
+                    log.warn("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, exception.getMessage());
                     if (exception instanceof RetriableException)
                         nextAutoCommitDeadline = Math.min(time.milliseconds() 
+ retryBackoffMs, nextAutoCommitDeadline);
                 } else {
-                    log.debug("Completed auto-commit of offsets {} for group 
{}", offsets, groupId);
+                    log.debug("Completed asynchronous auto-commit of offsets 
{}", offsets);
                 }
             }
         });
@@ -666,19 +662,16 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         if (autoCommitEnabled) {
             Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = 
subscriptions.allConsumed();
             try {
-                log.debug("Sending synchronous auto-commit of offsets {} for 
group {}", allConsumedOffsets, groupId);
+                log.debug("Sending synchronous auto-commit of offsets {}", 
allConsumedOffsets);
                 if (!commitOffsetsSync(allConsumedOffsets, timeoutMs))
-                    log.debug("Auto-commit of offsets {} for group {} timed 
out before completion",
-                            allConsumedOffsets, groupId);
+                    log.debug("Auto-commit of offsets {} timed out before 
completion", allConsumedOffsets);
             } catch (WakeupException | InterruptException e) {
-                log.debug("Auto-commit of offsets {} for group {} was 
interrupted before completion",
-                        allConsumedOffsets, groupId);
+                log.debug("Auto-commit of offsets {} was interrupted before 
completion", allConsumedOffsets);
                 // rethrow wakeups since they are triggered by the user
                 throw e;
             } catch (Exception e) {
                 // consistent with async auto-commit failures, we do not 
propagate the exception
-                log.warn("Auto-commit of offsets {} failed for group {}: {}", 
allConsumedOffsets, groupId,
-                        e.getMessage());
+                log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumedOffsets, e.getMessage());
             }
         }
     }
@@ -687,7 +680,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         @Override
         public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
Exception exception) {
             if (exception != null)
-                log.error("Offset commit with offsets {} failed for group {}", 
offsets, groupId, exception);
+                log.error("Offset commit with offsets {} failed", offsets, 
exception);
         }
     }
 
@@ -734,7 +727,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                 setMemberId(generation.memberId).
                 setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);
 
-        log.trace("Sending OffsetCommit request with {} to coordinator {} for 
group {}", offsets, coordinator, groupId);
+        log.trace("Sending OffsetCommit request with {} to coordinator {}", 
offsets, coordinator);
 
         return client.send(coordinator, builder)
                 .compose(new OffsetCommitResponseHandler(offsets));
@@ -760,55 +753,52 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
                 Errors error = entry.getValue();
                 if (error == Errors.NONE) {
-                    log.debug("Group {} committed offset {} for partition {}", 
groupId, offset, tp);
+                    log.debug("Committed offset {} for partition {}", offset, 
tp);
                     if (subscriptions.isAssigned(tp))
                         // update the local cache only if the partition is 
still assigned
                         subscriptions.committed(tp, offsetAndMetadata);
-                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                    log.error("Not authorized to commit offsets for group {}", 
groupId);
-                    future.raise(new GroupAuthorizationException(groupId));
-                    return;
-                } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
-                    unauthorizedTopics.add(tp.topic());
-                } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
-                        || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
-                    // raise the error to the user
-                    log.debug("Offset commit for group {} failed on partition 
{}: {}", groupId, tp, error.message());
-                    future.raise(error);
-                    return;
-                } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
-                    // just retry
-                    log.debug("Offset commit for group {} failed: {}", 
groupId, error.message());
-                    future.raise(error);
-                    return;
-                } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
-                        || error == Errors.NOT_COORDINATOR
-                        || error == Errors.REQUEST_TIMED_OUT) {
-                    log.debug("Offset commit for group {} failed: {}", 
groupId, error.message());
-                    coordinatorDead();
-                    future.raise(error);
-                    return;
-                } else if (error == Errors.UNKNOWN_MEMBER_ID
-                        || error == Errors.ILLEGAL_GENERATION
-                        || error == Errors.REBALANCE_IN_PROGRESS) {
-                    // need to re-join group
-                    log.debug("Offset commit for group {} failed: {}", 
groupId, error.message());
-                    resetGeneration();
-                    future.raise(new CommitFailedException());
-                    return;
-                } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                    log.debug("Offset commit for group {} failed on partition 
{}: {}", groupId, tp, error.message());
-                    future.raise(new KafkaException("Partition " + tp + " may 
not exist or user may not have Describe access to topic"));
-                    return;
                 } else {
-                    log.error("Group {} failed to commit partition {} at 
offset {}: {}", groupId, tp, offset, error.message());
-                    future.raise(new KafkaException("Unexpected error in 
commit: " + error.message()));
-                    return;
+                    log.error("Offset commit failed on partition {} at offset 
{}: {}", tp, offset, error.message());
+
+                    if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+                        future.raise(new GroupAuthorizationException(groupId));
+                        return;
+                    } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+                        unauthorizedTopics.add(tp.topic());
+                    } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
+                            || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
+                        // raise the error to the user
+                        future.raise(error);
+                        return;
+                    } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                        // just retry
+                        future.raise(error);
+                        return;
+                    } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+                            || error == Errors.NOT_COORDINATOR
+                            || error == Errors.REQUEST_TIMED_OUT) {
+                        coordinatorDead();
+                        future.raise(error);
+                        return;
+                    } else if (error == Errors.UNKNOWN_MEMBER_ID
+                            || error == Errors.ILLEGAL_GENERATION
+                            || error == Errors.REBALANCE_IN_PROGRESS) {
+                        // need to re-join group
+                        resetGeneration();
+                        future.raise(new CommitFailedException());
+                        return;
+                    } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                        future.raise(new KafkaException("Partition " + tp + " 
may not exist or user may not have Describe access to topic"));
+                        return;
+                    } else {
+                        future.raise(new KafkaException("Unexpected error in 
commit: " + error.message()));
+                        return;
+                    }
                 }
             }
 
             if (!unauthorizedTopics.isEmpty()) {
-                log.error("Not authorized to commit to topics {} for group 
{}", unauthorizedTopics, groupId);
+                log.error("Not authorized to commit to topics {}", 
unauthorizedTopics);
                 future.raise(new 
TopicAuthorizationException(unauthorizedTopics));
             } else {
                 future.complete(null);
@@ -828,10 +818,10 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         if (coordinator == null)
             return RequestFuture.coordinatorNotAvailable();
 
-        log.debug("Group {} fetching committed offsets for partitions: {}", 
groupId, partitions);
+        log.debug("Fetching committed offsets for partitions: {}", partitions);
         // construct the request
-        OffsetFetchRequest.Builder requestBuilder =
-                new OffsetFetchRequest.Builder(this.groupId, new 
ArrayList<>(partitions));
+        OffsetFetchRequest.Builder requestBuilder = new 
OffsetFetchRequest.Builder(this.groupId,
+                new ArrayList<>(partitions));
 
         // send the request with a callback
         return client.send(coordinator, requestBuilder)
@@ -843,7 +833,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         public void handle(OffsetFetchResponse response, 
RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
             if (response.hasError()) {
                 Errors error = response.error();
-                log.debug("Offset fetch for group {} failed: {}", groupId, 
error.message());
+                log.debug("Offset fetch failed: {}", error.message());
 
                 if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                     // just retry
@@ -866,7 +856,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                 OffsetFetchResponse.PartitionData data = entry.getValue();
                 if (data.hasError()) {
                     Errors error = data.error;
-                    log.debug("Group {} failed to fetch offset for partition 
{}: {}", groupId, tp, error.message());
+                    log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
 
                     if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                         future.raise(new KafkaException("Partition " + tp + " 
may not exist or the user may not have " +
@@ -879,7 +869,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                     // record the position with the offset (-1 indicates no 
committed offset to fetch)
                     offsets.put(tp, new OffsetAndMetadata(data.offset, 
data.metadata));
                 } else {
-                    log.debug("Group {} has no committed offset for partition 
{}", groupId, tp);
+                    log.debug("Found no committed offset for partition {}", 
tp);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index bb7f9f2..4a38d04 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -16,17 +16,6 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
@@ -40,9 +29,21 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Higher level consumer access to the network layer with basic support for 
request futures. This class
@@ -50,11 +51,11 @@ import org.slf4j.LoggerFactory;
  * are held when they are invoked.
  */
 public class ConsumerNetworkClient implements Closeable {
-    private static final Logger log = 
LoggerFactory.getLogger(ConsumerNetworkClient.class);
     private static final long MAX_POLL_TIMEOUT_MS = 5000L;
 
     // the mutable state of this class is protected by the object's monitor 
(excluding the wakeup
     // flag and the request completion queue below).
+    private final Logger log;
     private final KafkaClient client;
     private final UnsentRequests unsent = new UnsentRequests();
     private final Metadata metadata;
@@ -71,11 +72,13 @@ public class ConsumerNetworkClient implements Closeable {
     // atomic to avoid the need to acquire the lock above in order to enable 
it concurrently.
     private final AtomicBoolean wakeup = new AtomicBoolean(false);
 
-    public ConsumerNetworkClient(KafkaClient client,
+    public ConsumerNetworkClient(LogContext logContext,
+                                 KafkaClient client,
                                  Metadata metadata,
                                  Time time,
                                  long retryBackoffMs,
                                  long requestTimeoutMs) {
+        this.log = logContext.logger(ConsumerNetworkClient.class);
         this.client = client;
         this.metadata = metadata;
         this.time = time;
@@ -152,7 +155,7 @@ public class ConsumerNetworkClient implements Closeable {
     public void wakeup() {
         // wakeup should be safe without holding the client lock since it 
simply delegates to
         // Selector's wakeup, which is threadsafe
-        log.trace("Received user wakeup");
+        log.debug("Received user wakeup");
         this.wakeup.set(true);
         this.client.wakeup();
     }
@@ -426,7 +429,7 @@ public class ConsumerNetworkClient implements Closeable {
 
     public void maybeTriggerWakeup() {
         if (!wakeupDisabled.get() && wakeup.get()) {
-            log.trace("Raising wakeup exception in response to user wakeup");
+            log.debug("Raising WakeupException in response to user wakeup");
             wakeup.set(false);
             throw new WakeupException();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 23c5902..01db34f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -62,10 +62,10 @@ import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.ExtendedDeserializer;
 import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.nio.ByteBuffer;
@@ -91,9 +91,7 @@ import static 
org.apache.kafka.common.serialization.ExtendedDeserializer.Wrapper
  * This class manage the fetching process with the brokers.
  */
 public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
-
-    private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
-
+    private final Logger log;
     private final ConsumerNetworkClient client;
     private final Time time;
     private final int minBytes;
@@ -115,7 +113,8 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
 
     private PartitionRecords nextInLineRecords = null;
 
-    public Fetcher(ConsumerNetworkClient client,
+    public Fetcher(LogContext logContext,
+                   ConsumerNetworkClient client,
                    int minBytes,
                    int maxBytes,
                    int maxWaitMs,
@@ -131,6 +130,7 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
                    Time time,
                    long retryBackoffMs,
                    IsolationLevel isolationLevel) {
+        this.log = logContext.logger(Fetcher.class);
         this.time = time;
         this.client = client;
         this.metadata = metadata;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/main/java/org/apache/kafka/common/utils/LogContext.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/LogContext.java 
b/clients/src/main/java/org/apache/kafka/common/utils/LogContext.java
new file mode 100644
index 0000000..bbf51fa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/LogContext.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
+
+/**
+ * This class provides a way to instrument loggers with a common context which 
can be used to
+ * automatically enrich log messages. For example, in the KafkaConsumer, it is 
often useful to know
+ * the groupId of the consumer, so this can be added to a context object which 
can then be passed to
+ * all of the dependent components in order to build new loggers. This removes 
the need to manually
+ * add the groupId to each message.
+ */
+public class LogContext {
+
+    private final String logPrefix;
+
+    public LogContext(String logPrefix) {
+        this.logPrefix = logPrefix == null ? "" : logPrefix;
+    }
+
+    public LogContext() {
+        this("");
+    }
+
+    public Logger logger(Class<?> clazz) {
+        return new KafkaLogger(clazz, logPrefix);
+    }
+
+    private static class KafkaLogger implements Logger {
+        private final Logger logger;
+        private final String logPrefix;
+
+        public KafkaLogger(Class<?> clazz, String logPrefix) {
+            this.logger = LoggerFactory.getLogger(clazz);
+            this.logPrefix = logPrefix;
+        }
+
+        @Override
+        public String getName() {
+            return logger.getName();
+        }
+
+        @Override
+        public boolean isTraceEnabled() {
+            return logger.isTraceEnabled();
+        }
+
+        @Override
+        public boolean isTraceEnabled(Marker marker) {
+            return logger.isTraceEnabled(marker);
+        }
+
+        @Override
+        public boolean isDebugEnabled() {
+            return logger.isDebugEnabled();
+        }
+
+        @Override
+        public boolean isDebugEnabled(Marker marker) {
+            return logger.isDebugEnabled(marker);
+        }
+
+        @Override
+        public boolean isInfoEnabled() {
+            return logger.isInfoEnabled();
+        }
+
+        @Override
+        public boolean isInfoEnabled(Marker marker) {
+            return logger.isInfoEnabled(marker);
+        }
+
+        @Override
+        public boolean isWarnEnabled() {
+            return logger.isWarnEnabled();
+        }
+
+        @Override
+        public boolean isWarnEnabled(Marker marker) {
+            return logger.isWarnEnabled(marker);
+        }
+
+        @Override
+        public boolean isErrorEnabled() {
+            return logger.isErrorEnabled();
+        }
+
+        @Override
+        public boolean isErrorEnabled(Marker marker) {
+            return logger.isErrorEnabled(marker);
+        }
+
+        @Override
+        public void trace(String message) {
+            if (logger.isTraceEnabled())
+                logger.trace(logPrefix + message);
+        }
+
+        @Override
+        public void trace(String message, Object arg) {
+            if (logger.isTraceEnabled())
+                logger.trace(logPrefix + message, arg);
+        }
+
+        @Override
+        public void trace(String message, Object arg1, Object arg2) {
+            if (logger.isTraceEnabled())
+                logger.trace(logPrefix + message, arg1, arg2);
+        }
+
+        @Override
+        public void trace(String message, Object... args) {
+            if (logger.isTraceEnabled())
+                logger.trace(logPrefix + message, args);
+        }
+
+        @Override
+        public void trace(String msg, Throwable t) {
+            if (logger.isTraceEnabled())
+                logger.trace(logPrefix + msg, t);
+        }
+
+        @Override
+        public void trace(Marker marker, String msg) {
+            if (logger.isTraceEnabled())
+                logger.trace(marker, logPrefix + msg);
+        }
+
+        @Override
+        public void trace(Marker marker, String format, Object arg) {
+            if (logger.isTraceEnabled())
+                logger.trace(marker, logPrefix + format, arg);
+        }
+
+        @Override
+        public void trace(Marker marker, String format, Object arg1, Object 
arg2) {
+            if (logger.isTraceEnabled())
+                logger.trace(marker, logPrefix + format, arg1, arg2);
+        }
+
+        @Override
+        public void trace(Marker marker, String format, Object... argArray) {
+            if (logger.isTraceEnabled())
+                logger.trace(marker, logPrefix + format, argArray);
+        }
+
+        @Override
+        public void trace(Marker marker, String msg, Throwable t) {
+            if (logger.isTraceEnabled())
+                logger.trace(marker, logPrefix + msg, t);
+        }
+
+        @Override
+        public void debug(String message) {
+            if (logger.isDebugEnabled())
+                logger.debug(logPrefix + message);
+        }
+
+        @Override
+        public void debug(String message, Object arg) {
+            if (logger.isDebugEnabled())
+                logger.debug(logPrefix + message, arg);
+        }
+
+        @Override
+        public void debug(String message, Object arg1, Object arg2) {
+            if (logger.isDebugEnabled())
+                logger.debug(logPrefix + message, arg1, arg2);
+        }
+
+        @Override
+        public void debug(String message, Object... args) {
+            if (logger.isDebugEnabled())
+                logger.debug(logPrefix + message, args);
+        }
+
+        @Override
+        public void debug(String msg, Throwable t) {
+            if (logger.isDebugEnabled())
+                logger.debug(logPrefix + msg, t);
+        }
+
+        @Override
+        public void debug(Marker marker, String msg) {
+            if (logger.isDebugEnabled())
+                logger.debug(marker, logPrefix + msg);
+        }
+
+        @Override
+        public void debug(Marker marker, String format, Object arg) {
+            if (logger.isDebugEnabled())
+                logger.debug(marker, logPrefix + format, arg);
+        }
+
+        @Override
+        public void debug(Marker marker, String format, Object arg1, Object 
arg2) {
+            if (logger.isDebugEnabled())
+                logger.debug(marker, logPrefix + format, arg1, arg2);
+        }
+
+        @Override
+        public void debug(Marker marker, String format, Object... arguments) {
+            if (logger.isDebugEnabled())
+                logger.debug(marker, logPrefix + format, arguments);
+        }
+
+        @Override
+        public void debug(Marker marker, String msg, Throwable t) {
+            if (logger.isDebugEnabled())
+                logger.debug(marker, logPrefix + msg, t);
+        }
+
+        @Override
+        public void warn(String message) {
+            logger.warn(logPrefix + message);
+        }
+
+        @Override
+        public void warn(String message, Object arg) {
+            logger.warn(logPrefix + message, arg);
+        }
+
+        @Override
+        public void warn(String message, Object arg1, Object arg2) {
+            logger.warn(logPrefix + message, arg1, arg2);
+        }
+
+        @Override
+        public void warn(String message, Object... args) {
+            logger.warn(logPrefix + message, args);
+        }
+
+        @Override
+        public void warn(String msg, Throwable t) {
+            logger.warn(logPrefix + msg, t);
+        }
+
+        @Override
+        public void warn(Marker marker, String msg) {
+            logger.warn(marker, logPrefix + msg);
+        }
+
+        @Override
+        public void warn(Marker marker, String format, Object arg) {
+            logger.warn(marker, logPrefix + format, arg);
+        }
+
+        @Override
+        public void warn(Marker marker, String format, Object arg1, Object 
arg2) {
+            logger.warn(marker, logPrefix + format, arg1, arg2);
+        }
+
+        @Override
+        public void warn(Marker marker, String format, Object... arguments) {
+            logger.warn(marker, logPrefix + format, arguments);
+        }
+
+        @Override
+        public void warn(Marker marker, String msg, Throwable t) {
+            logger.warn(marker, logPrefix + msg, t);
+        }
+
+        @Override
+        public void error(String message) {
+            logger.error(logPrefix + message);
+        }
+
+        @Override
+        public void error(String message, Object arg) {
+            logger.error(logPrefix + message, arg);
+        }
+
+        @Override
+        public void error(String message, Object arg1, Object arg2) {
+            logger.error(logPrefix + message, arg1, arg2);
+        }
+
+        @Override
+        public void error(String message, Object... args) {
+            logger.error(logPrefix + message, args);
+        }
+
+        @Override
+        public void error(String msg, Throwable t) {
+            logger.error(logPrefix + msg, t);
+        }
+
+        @Override
+        public void error(Marker marker, String msg) {
+            logger.error(marker, logPrefix + msg);
+        }
+
+        @Override
+        public void error(Marker marker, String format, Object arg) {
+            logger.error(marker, logPrefix + format, arg);
+        }
+
+        @Override
+        public void error(Marker marker, String format, Object arg1, Object 
arg2) {
+            logger.error(marker, logPrefix + format, arg1, arg2);
+        }
+
+        @Override
+        public void error(Marker marker, String format, Object... arguments) {
+            logger.error(marker, logPrefix + format, arguments);
+        }
+
+        @Override
+        public void error(Marker marker, String msg, Throwable t) {
+            logger.error(marker, logPrefix + msg, t);
+        }
+
+        @Override
+        public void info(String message) {
+            logger.info(logPrefix + message);
+        }
+
+        @Override
+        public void info(String message, Object arg) {
+            logger.info(logPrefix + message, arg);
+        }
+
+        @Override
+        public void info(String message, Object arg1, Object arg2) {
+            logger.info(logPrefix + message, arg1, arg2);
+        }
+
+        @Override
+        public void info(String message, Object... args) {
+            logger.info(logPrefix + message, args);
+        }
+
+        @Override
+        public void info(String msg, Throwable t) {
+            logger.info(logPrefix + msg, t);
+        }
+
+        @Override
+        public void info(Marker marker, String msg) {
+            logger.info(marker, logPrefix + msg);
+        }
+
+        @Override
+        public void info(Marker marker, String format, Object arg) {
+            logger.info(marker, logPrefix + format, arg);
+        }
+
+        @Override
+        public void info(Marker marker, String format, Object arg1, Object 
arg2) {
+            logger.info(marker, logPrefix + format, arg1, arg2);
+        }
+
+        @Override
+        public void info(Marker marker, String format, Object... arguments) {
+            logger.info(marker, logPrefix + format, arguments);
+        }
+
+        @Override
+        public void info(Marker marker, String msg, Throwable t) {
+            logger.info(marker, logPrefix + msg, t);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9fd7e19..eed012e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -63,6 +63,7 @@ import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -1621,8 +1622,11 @@ public class KafkaConsumerTest {
         ConsumerMetrics metricsRegistry = new 
ConsumerMetrics(metricGroupPrefix);
 
         SubscriptionState subscriptions = new 
SubscriptionState(autoResetStrategy);
-        ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(client, metadata, time, retryBackoffMs, requestTimeoutMs);
+        LogContext loggerFactory = new LogContext();
+        ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(loggerFactory, client, metadata, time,
+                retryBackoffMs, requestTimeoutMs);
         ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(
+                loggerFactory,
                 consumerClient,
                 groupId,
                 rebalanceTimeoutMs,
@@ -1642,6 +1646,7 @@ public class KafkaConsumerTest {
                 true);
 
         Fetcher<String, String> fetcher = new Fetcher<>(
+                loggerFactory,
                 consumerClient,
                 minBytes,
                 maxBytes,
@@ -1660,6 +1665,7 @@ public class KafkaConsumerTest {
                 IsolationLevel.READ_UNCOMMITTED);
 
         return new KafkaConsumer<>(
+                loggerFactory,
                 clientId,
                 consumerCoordinator,
                 keyDeserializer,
@@ -1672,8 +1678,7 @@ public class KafkaConsumerTest {
                 subscriptions,
                 metadata,
                 retryBackoffMs,
-                requestTimeoutMs
-        );
+                requestTimeoutMs);
     }
 
     private static class FetchInfo {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 637c832..f3982c0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestCondition;
@@ -75,7 +76,7 @@ public class AbstractCoordinatorTest {
         this.mockClient = new MockClient(mockTime);
 
         Metadata metadata = new Metadata(100L, 60 * 60 * 1000L, true);
-        this.consumerClient = new ConsumerNetworkClient(mockClient, metadata, 
mockTime,
+        this.consumerClient = new ConsumerNetworkClient(new LogContext(), 
mockClient, metadata, mockTime,
                 retryBackoffMs, REQUEST_TIMEOUT_MS);
         Metrics metrics = new Metrics();
 
@@ -568,8 +569,8 @@ public class AbstractCoordinatorTest {
         public DummyCoordinator(ConsumerNetworkClient client,
                                 Metrics metrics,
                                 Time time) {
-            super(client, GROUP_ID, REBALANCE_TIMEOUT_MS, SESSION_TIMEOUT_MS, 
HEARTBEAT_INTERVAL_MS, metrics,
-                  METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS, false);
+            super(new LogContext(), client, GROUP_ID, REBALANCE_TIMEOUT_MS, 
SESSION_TIMEOUT_MS,
+                    HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, 
RETRY_BACKOFF_MS, false);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 18e18ed..65de8c0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -51,6 +51,7 @@ import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -123,7 +124,7 @@ public class ConsumerCoordinatorTest {
         this.metadata = new Metadata(0, Long.MAX_VALUE, true);
         this.metadata.update(cluster, Collections.<String>emptySet(), 
time.milliseconds());
         this.client = new MockClient(time, metadata);
-        this.consumerClient = new ConsumerNetworkClient(client, metadata, 
time, 100, 1000);
+        this.consumerClient = new ConsumerNetworkClient(new LogContext(), 
client, metadata, time, 100, 1000);
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();
         this.mockOffsetCommitCallback = new MockCommitCallback();
@@ -1664,6 +1665,7 @@ public class ConsumerCoordinatorTest {
                                                  final boolean 
autoCommitEnabled,
                                                  final boolean leaveGroup) {
         return new ConsumerCoordinator(
+                new LogContext(),
                 consumerClient,
                 groupId,
                 rebalanceTimeoutMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 8e71dd5..3ed8e3d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
@@ -48,7 +49,8 @@ public class ConsumerNetworkClientTest {
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
     private Node node = cluster.nodes().get(0);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-    private ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(client, metadata, time, 100, 1000);
+    private ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(new LogContext(),
+            client, metadata, time, 100, 1000);
 
     @Test
     public void send() {
@@ -105,7 +107,8 @@ public class ConsumerNetworkClientTest {
     @Test
     public void doNotBlockIfPollConditionIsSatisfied() {
         NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
-        ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(mockNetworkClient, metadata, time, 100, 1000);
+        ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new 
LogContext(),
+                mockNetworkClient, metadata, time, 100, 1000);
 
         // expect poll, but with no timeout
         EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(0L), 
EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
@@ -127,7 +130,8 @@ public class ConsumerNetworkClientTest {
         long timeout = 4000L;
 
         NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
-        ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(mockNetworkClient, metadata, time, 100, 1000);
+        ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new 
LogContext(),
+                mockNetworkClient, metadata, time, 100, 1000);
 
         EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(1);
         EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(timeout), 
EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
@@ -149,7 +153,8 @@ public class ConsumerNetworkClientTest {
         long retryBackoffMs = 100L;
 
         NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
-        ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(mockNetworkClient, metadata, time, retryBackoffMs, 1000L);
+        ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new 
LogContext(),
+                mockNetworkClient, metadata, time, retryBackoffMs, 1000L);
 
         EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(0);
         EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(retryBackoffMs), 
EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
@@ -205,7 +210,7 @@ public class ConsumerNetworkClientTest {
             }
         };
         // Queue first send, sleep long enough for this to expire and then 
queue second send
-        consumerClient = new ConsumerNetworkClient(client, metadata, time, 
100, unsentExpiryMs);
+        consumerClient = new ConsumerNetworkClient(new LogContext(), client, 
metadata, time, 100, unsentExpiryMs);
         RequestFuture<ClientResponse> future1 = consumerClient.send(node, 
heartbeat());
         assertEquals(1, consumerClient.pendingRequestCount());
         assertEquals(1, consumerClient.pendingRequestCount(node));

http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index b747677..5fe4369 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -75,6 +75,7 @@ import 
org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.DelayedReceive;
@@ -129,7 +130,8 @@ public class FetcherTest {
     private SubscriptionState subscriptions = new 
SubscriptionState(OffsetResetStrategy.EARLIEST);
     private SubscriptionState subscriptionsNoAutoReset = new 
SubscriptionState(OffsetResetStrategy.NONE);
     private static final double EPSILON = 0.0001;
-    private ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(client, metadata, time, 100, 1000);
+    private ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(new LogContext(),
+            client, metadata, time, 100, 1000);
 
     private MemoryRecords records;
     private MemoryRecords nextRecords;
@@ -2038,7 +2040,9 @@ public class FetcherTest {
                                                Deserializer<V> 
valueDeserializer,
                                                int maxPollRecords,
                                                IsolationLevel isolationLevel) {
-        return new Fetcher<>(consumerClient,
+        return new Fetcher<>(
+                new LogContext(),
+                consumerClient,
                 minBytes,
                 maxBytes,
                 maxWaitMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 0252ae9..6007728 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -24,11 +24,11 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.utils.CircularIterator;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.nio.ByteBuffer;
@@ -44,11 +44,10 @@ import java.util.Map;
  * to workers.
  */
 public final class WorkerCoordinator extends AbstractCoordinator implements 
Closeable {
-    private static final Logger log = 
LoggerFactory.getLogger(WorkerCoordinator.class);
-
     // Currently doesn't support multiple task assignment strategies, so we 
just fill in a default value
     public static final String DEFAULT_SUBPROTOCOL = "default";
 
+    private final Logger log;
     private final String restUrl;
     private final ConfigBackingStore configStorage;
     private ConnectProtocol.Assignment assignmentSnapshot;
@@ -61,7 +60,8 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
     /**
      * Initialize the coordination manager.
      */
-    public WorkerCoordinator(ConsumerNetworkClient client,
+    public WorkerCoordinator(LogContext logContext,
+                             ConsumerNetworkClient client,
                              String groupId,
                              int rebalanceTimeoutMs,
                              int sessionTimeoutMs,
@@ -73,7 +73,8 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
                              String restUrl,
                              ConfigBackingStore configStorage,
                              WorkerRebalanceListener listener) {
-        super(client,
+        super(logContext,
+              client,
               groupId,
               rebalanceTimeoutMs,
               sessionTimeoutMs,
@@ -83,6 +84,7 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
               time,
               retryBackoffMs,
               true);
+        this.log = logContext.logger(WorkerCoordinator.class);
         this.restUrl = restUrl;
         this.configStorage = configStorage;
         this.assignmentSnapshot = null;

Reply via email to