Repository: kafka
Updated Branches:
  refs/heads/trunk ed96523a2 -> 6896f1ddb


http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 62e2fc1..4b7711a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -31,11 +31,11 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
@@ -53,11 +53,11 @@ import java.util.concurrent.atomic.AtomicReference;
  * higher level operations in response to group membership events being 
handled by the herder.
  */
 public class WorkerGroupMember {
-    private static final Logger log = 
LoggerFactory.getLogger(WorkerGroupMember.class);
 
     private static final AtomicInteger CONNECT_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
     private static final String JMX_PREFIX = "kafka.connect";
 
+    private final Logger log;
     private final Time time;
     private final String clientId;
     private final ConsumerNetworkClient client;
@@ -78,6 +78,11 @@ public class WorkerGroupMember {
 
             String clientIdConfig = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
             clientId = clientIdConfig.length() <= 0 ? "connect-" + 
CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
+            String groupId = 
config.getString(DistributedConfig.GROUP_ID_CONFIG);
+
+            LogContext logContext = new LogContext("[Worker clientId=" + 
clientId + ", groupId=" + groupId + "] ");
+            this.log = logContext.logger(WorkerGroupMember.class);
+
             Map<String, String> metricsTags = new LinkedHashMap<>();
             metricsTags.put("client-id", clientId);
             MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
@@ -105,10 +110,17 @@ public class WorkerGroupMember {
                     time,
                     true,
                     new ApiVersions());
-            this.client = new ConsumerNetworkClient(netClient, metadata, time, 
retryBackoffMs,
+            this.client = new ConsumerNetworkClient(
+                    logContext,
+                    netClient,
+                    metadata,
+                    time,
+                    retryBackoffMs,
                     
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG));
-            this.coordinator = new WorkerCoordinator(this.client,
-                    config.getString(DistributedConfig.GROUP_ID_CONFIG),
+            this.coordinator = new WorkerCoordinator(
+                    logContext,
+                    this.client,
+                    groupId,
                     
config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG),
                     config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG),
                     
config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
@@ -182,7 +194,7 @@ public class WorkerGroupMember {
 
     private void stop(boolean swallowException) {
         log.trace("Stopping the Connect group member.");
-        AtomicReference<Throwable> firstException = new 
AtomicReference<Throwable>();
+        AtomicReference<Throwable> firstException = new AtomicReference<>();
         this.stopped = true;
         ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
         ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);

http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index edef7dc..a6b3d49 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -29,6 +29,7 @@ import 
org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
@@ -88,18 +89,22 @@ public class WorkerCoordinatorTest {
 
     @Before
     public void setup() {
+        LogContext loggerFactory = new LogContext();
+
         this.time = new MockTime();
         this.client = new MockClient(time);
         this.metadata = new Metadata(0, Long.MAX_VALUE, true);
         this.metadata.update(cluster, Collections.<String>emptySet(), 
time.milliseconds());
-        this.consumerClient = new ConsumerNetworkClient(client, metadata, 
time, 100, 1000);
+        this.consumerClient = new ConsumerNetworkClient(loggerFactory, client, 
metadata, time, 100, 1000);
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();
         this.configStorage = 
PowerMock.createMock(KafkaConfigBackingStore.class);
 
         client.setNode(node);
 
-        this.coordinator = new WorkerCoordinator(consumerClient,
+        this.coordinator = new WorkerCoordinator(
+                loggerFactory,
+                consumerClient,
                 groupId,
                 rebalanceTimeoutMs,
                 sessionTimeoutMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala 
b/core/src/main/scala/kafka/admin/AdminClient.scala
index 2baed02..8f2e1ba 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
 import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
 import org.apache.kafka.common.requests.OffsetFetchResponse
-import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
+import org.apache.kafka.common.utils.{LogContext, KafkaThread, Time, Utils}
 import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
@@ -469,6 +469,7 @@ object AdminClient {
       new ApiVersions)
 
     val highLevelClient = new ConsumerNetworkClient(
+      new LogContext(),
       networkClient,
       metadata,
       time,

Reply via email to