Repository: kafka Updated Branches: refs/heads/trunk ed96523a2 -> 6896f1ddb
http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 62e2fc1..4b7711a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -31,11 +31,11 @@ import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.Collections; @@ -53,11 +53,11 @@ import java.util.concurrent.atomic.AtomicReference; * higher level operations in response to group membership events being handled by the herder. */ public class WorkerGroupMember { - private static final Logger log = LoggerFactory.getLogger(WorkerGroupMember.class); private static final AtomicInteger CONNECT_CLIENT_ID_SEQUENCE = new AtomicInteger(1); private static final String JMX_PREFIX = "kafka.connect"; + private final Logger log; private final Time time; private final String clientId; private final ConsumerNetworkClient client; @@ -78,6 +78,11 @@ public class WorkerGroupMember { String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig; + String groupId = config.getString(DistributedConfig.GROUP_ID_CONFIG); + + LogContext logContext = new LogContext("[Worker clientId=" + clientId + ", groupId=" + groupId + "] "); + this.log = logContext.logger(WorkerGroupMember.class); + Map<String, String> metricsTags = new LinkedHashMap<>(); metricsTags.put("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG)) @@ -105,10 +110,17 @@ public class WorkerGroupMember { time, true, new ApiVersions()); - this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs, + this.client = new ConsumerNetworkClient( + logContext, + netClient, + metadata, + time, + retryBackoffMs, config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)); - this.coordinator = new WorkerCoordinator(this.client, - config.getString(DistributedConfig.GROUP_ID_CONFIG), + this.coordinator = new WorkerCoordinator( + logContext, + this.client, + groupId, config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG), config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG), config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG), @@ -182,7 +194,7 @@ public class WorkerGroupMember { private void stop(boolean swallowException) { log.trace("Stopping the Connect group member."); - AtomicReference<Throwable> firstException = new AtomicReference<Throwable>(); + AtomicReference<Throwable> firstException = new AtomicReference<>(); this.stopped = true; ClientUtils.closeQuietly(coordinator, "coordinator", firstException); ClientUtils.closeQuietly(metrics, "consumer metrics", firstException); http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index edef7dc..a6b3d49 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.storage.KafkaConfigBackingStore; @@ -88,18 +89,22 @@ public class WorkerCoordinatorTest { @Before public void setup() { + LogContext loggerFactory = new LogContext(); + this.time = new MockTime(); this.client = new MockClient(time); this.metadata = new Metadata(0, Long.MAX_VALUE, true); this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); - this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); + this.consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time, 100, 1000); this.metrics = new Metrics(time); this.rebalanceListener = new MockRebalanceListener(); this.configStorage = PowerMock.createMock(KafkaConfigBackingStore.class); client.setNode(node); - this.coordinator = new WorkerCoordinator(consumerClient, + this.coordinator = new WorkerCoordinator( + loggerFactory, + consumerClient, groupId, rebalanceTimeoutMs, sessionTimeoutMs, http://git-wip-us.apache.org/repos/asf/kafka/blob/6896f1dd/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 2baed02..8f2e1ba 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata import org.apache.kafka.common.requests.OffsetFetchResponse -import org.apache.kafka.common.utils.{KafkaThread, Time, Utils} +import org.apache.kafka.common.utils.{LogContext, KafkaThread, Time, Utils} import org.apache.kafka.common.{Cluster, Node, TopicPartition} import scala.collection.JavaConverters._ @@ -469,6 +469,7 @@ object AdminClient { new ApiVersions) val highLevelClient = new ConsumerNetworkClient( + new LogContext(), networkClient, metadata, time,