Repository: kafka Updated Branches: refs/heads/trunk 8de62253a -> ef5d168cc
KAFKA-2697: client-side support for leave group Author: Jason Gustafson <[email protected]> Reviewers: Ewen Cheslack-Postava, Guozhang Wang Closes #414 from hachikuji/KAFKA-2697 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef5d168c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef5d168c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef5d168c Branch: refs/heads/trunk Commit: ef5d168cc8f10ad4f0efe9df4cbe849a4b35496e Parents: 8de6225 Author: Jason Gustafson <[email protected]> Authored: Wed Nov 4 15:04:03 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 4 15:04:03 2015 -0800 ---------------------------------------------------------------------- .../apache/kafka/clients/consumer/Consumer.java | 12 ++-- .../kafka/clients/consumer/KafkaConsumer.java | 18 +++-- .../kafka/clients/consumer/MockConsumer.java | 2 +- .../consumer/internals/AbstractCoordinator.java | 76 +++++++++++++++++--- .../consumer/internals/ConsumerCoordinator.java | 28 ++++---- .../internals/ConsumerNetworkClient.java | 15 ++-- .../apache/kafka/common/protocol/Protocol.java | 10 +-- .../common/requests/LeaveGroupRequest.java | 16 ++--- .../internals/ConsumerCoordinatorTest.java | 66 ++++++++++++++++- .../runtime/distributed/WorkerCoordinator.java | 9 +-- .../src/main/scala/kafka/server/KafkaApis.scala | 2 +- 11 files changed, 190 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index a3d8776..c9f114d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -12,18 +12,18 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + import java.io.Closeable; import java.util.List; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.annotation.InterfaceStability; - /** * @see KafkaConsumer * @see MockConsumer http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index a6be519..f3d2e15 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -727,7 +727,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { try { log.debug("Unsubscribed all topics or patterns and assigned partitions"); this.subscriptions.unsubscribe(); - this.coordinator.resetGeneration(); + this.coordinator.maybeLeaveGroup(false); this.metadata.needMetadataForAllTopics(false); } finally { release(); @@ -790,11 +790,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { throw new IllegalArgumentException("Timeout must not be negative"); // poll for new data until the timeout expires + long start = time.milliseconds(); long remaining = timeout; - while (remaining >= 0) { - long start = time.milliseconds(); + do { Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining); - if (!records.isEmpty()) { // if data is available, then return it, but first send off the // next round of fetches to enable pipelining while the user is @@ -804,8 +803,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { return new ConsumerRecords<>(records); } - remaining -= time.milliseconds() - start; - } + long elapsed = time.milliseconds() - start; + remaining = timeout - elapsed; + } while (remaining > 0); return ConsumerRecords.empty(); } finally { @@ -1157,6 +1157,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } } + /** + * Close the consumer, waiting indefinitely for any needed cleanup. If auto-commit is + * enabled, this will commit the current offsets. + */ @Override public void close() { acquire(); @@ -1179,7 +1183,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { private void close(boolean swallowException) { log.trace("Closing the Kafka consumer."); - AtomicReference<Throwable> firstException = new AtomicReference<Throwable>(); + AtomicReference<Throwable> firstException = new AtomicReference<>(); this.closed = true; ClientUtils.closeQuietly(coordinator, "coordinator", firstException); ClientUtils.closeQuietly(metrics, "consumer metrics", firstException); http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 25c0c2c..894bc93 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -31,8 +31,8 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.regex.Pattern; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; /** * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 781ff78..e9af6c8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -37,6 +37,8 @@ import org.apache.kafka.common.requests.HeartbeatRequest; import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.JoinGroupResponse; +import org.apache.kafka.common.requests.LeaveGroupRequest; +import org.apache.kafka.common.requests.LeaveGroupResponse; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; @@ -45,6 +47,7 @@ import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -77,7 +80,7 @@ import java.util.concurrent.TimeUnit; * {@link #onJoinComplete(int, String, String, ByteBuffer)}. * */ -public abstract class AbstractCoordinator { +public abstract class AbstractCoordinator implements Closeable { private static final Logger log = LoggerFactory.getLogger(AbstractCoordinator.class); @@ -196,15 +199,6 @@ public abstract class AbstractCoordinator { } /** - * Reset the generation/memberId tracked by this member - */ - public void resetGeneration() { - this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID; - this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; - rejoinNeeded = true; - } - - /** * Ensure that the group is active (i.e. joined and synced) */ public void ensureActiveGroup() { @@ -514,7 +508,6 @@ public abstract class AbstractCoordinator { return false; } - /** * Mark the current coordinator as dead. */ @@ -526,6 +519,67 @@ public abstract class AbstractCoordinator { } /** + * Close the coordinator, waiting if needed to send LeaveGroup. + */ + @Override + public void close() { + maybeLeaveGroup(true); + } + + /** + * Leave the current group and reset local generation/memberId. + */ + public void maybeLeaveGroup(boolean awaitResponse) { + if (!coordinatorUnknown() && generation > 0) { + // this is a minimal effort attempt to leave the group. we do not + // attempt any resending if the request fails or times out. + sendLeaveGroupRequest(awaitResponse); + } + + this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID; + this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; + rejoinNeeded = true; + } + + private void sendLeaveGroupRequest(boolean awaitResponse) { + LeaveGroupRequest request = new LeaveGroupRequest(groupId, memberId); + RequestFuture<Void> future = client.send(coordinator, ApiKeys.LEAVE_GROUP, request) + .compose(new LeaveGroupResponseHandler()); + + future.addListener(new RequestFutureListener<Void>() { + @Override + public void onSuccess(Void value) {} + + @Override + public void onFailure(RuntimeException e) { + log.info("LeaveGroup request failed with error", e); + } + }); + + if (awaitResponse) + client.poll(future); + else + client.poll(future, 0); + } + + private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> { + @Override + public LeaveGroupResponse parse(ClientResponse response) { + return new LeaveGroupResponse(response.responseBody()); + } + + @Override + public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) { + // process the response + short errorCode = leaveResponse.errorCode(); + if (errorCode == Errors.NONE.code()) + future.complete(null); + else + future.raise(Errors.forCode(errorCode)); + } + } + + /** * Send a heartbeat request now (visible only for testing). */ public RequestFuture<Void> sendHeartbeatRequest() { http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index c7323cb..25d389c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -15,9 +15,6 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.common.errors.GroupAuthorizationException; -import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment; @@ -26,6 +23,9 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -45,7 +45,6 @@ import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -58,7 +57,7 @@ import java.util.Set; /** * This class manages the coordination process with the consumer coordinator. */ -public final class ConsumerCoordinator extends AbstractCoordinator implements Closeable { +public final class ConsumerCoordinator extends AbstractCoordinator { private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class); @@ -305,15 +304,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl @Override public void close() { - // commit offsets prior to closing if auto-commit enabled - while (true) { - try { - maybeAutoCommitOffsetsSync(); - return; - } catch (WakeupException e) { - // ignore wakeups while closing to ensure we have a chance to commit - continue; + try { + while (true) { + try { + maybeAutoCommitOffsetsSync(); + return; + } catch (WakeupException e) { + // ignore wakeups while closing to ensure we have a chance to commit + continue; + } } + } finally { + super.close(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 4757fc4..f1f1cc7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -17,8 +17,8 @@ import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.RequestCompletionHandler; -import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.RequestHeader; @@ -162,12 +162,15 @@ public class ConsumerNetworkClient implements Closeable { * @throws WakeupException if {@link #wakeup()} is called from another thread */ public boolean poll(RequestFuture<?> future, long timeout) { - long now = time.milliseconds(); - long deadline = now + timeout; - while (!future.isDone() && now < deadline) { - poll(deadline - now, now); + long begin = time.milliseconds(); + long remaining = timeout; + long now = begin; + do { + poll(remaining, now); now = time.milliseconds(); - } + long elapsed = now - begin; + remaining = timeout - elapsed; + } while (!future.isDone() && remaining > 0); return future.isDone(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 00560db..ff844e7 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -528,10 +528,10 @@ public class Protocol { public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0}; /* Heartbeat api */ - public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."), + public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."), new Field("group_generation_id", INT32, - "The generation of the consumer group."), + "The generation of the group."), new Field("member_id", STRING, "The member id assigned by the group coordinator.")); @@ -542,10 +542,10 @@ public class Protocol { public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0}; /* Leave group api */ - public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."), - new Field("consumer_id", + public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."), + new Field("member_id", STRING, - "The consumer id assigned by the group coordinator.")); + "The member id assigned by the group coordinator.")); public static final Schema LEAVE_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16)); http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java index fcc056a..05bdf90 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java @@ -23,23 +23,23 @@ public class LeaveGroupRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEAVE_GROUP.id); private static final String GROUP_ID_KEY_NAME = "group_id"; - private static final String CONSUMER_ID_KEY_NAME = "consumer_id"; + private static final String MEMBER_ID_KEY_NAME = "member_id"; private final String groupId; - private final String consumerId; + private final String memberId; - public LeaveGroupRequest(String groupId, String consumerId) { + public LeaveGroupRequest(String groupId, String memberId) { super(new Struct(CURRENT_SCHEMA)); struct.set(GROUP_ID_KEY_NAME, groupId); - struct.set(CONSUMER_ID_KEY_NAME, consumerId); + struct.set(MEMBER_ID_KEY_NAME, memberId); this.groupId = groupId; - this.consumerId = consumerId; + this.memberId = memberId; } public LeaveGroupRequest(Struct struct) { super(struct); groupId = struct.getString(GROUP_ID_KEY_NAME); - consumerId = struct.getString(CONSUMER_ID_KEY_NAME); + memberId = struct.getString(MEMBER_ID_KEY_NAME); } @Override @@ -57,8 +57,8 @@ public class LeaveGroupRequest extends AbstractRequest { return groupId; } - public String consumerId() { - return consumerId; + public String memberId() { + return memberId; } public static LeaveGroupRequest parse(ByteBuffer buffer, int versionId) { http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 7fd6d88..391f719 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -35,7 +35,10 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.GroupCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatResponse; +import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.JoinGroupResponse; +import org.apache.kafka.common.requests.LeaveGroupRequest; +import org.apache.kafka.common.requests.LeaveGroupResponse; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.OffsetFetchResponse; @@ -362,6 +365,64 @@ public class ConsumerCoordinatorTest { } @Test + public void testLeaveGroupOnClose() { + final String consumerId = "consumer"; + + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.needReassignment(); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + coordinator.ensurePartitionAssignment(); + + final AtomicBoolean received = new AtomicBoolean(false); + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + received.set(true); + LeaveGroupRequest leaveRequest = new LeaveGroupRequest(request.request().body()); + return leaveRequest.memberId().equals(consumerId) && + leaveRequest.groupId().equals(groupId); + } + }, new LeaveGroupResponse(Errors.NONE.code()).toStruct()); + coordinator.close(); + assertTrue(received.get()); + } + + @Test + public void testMaybeLeaveGroup() { + final String consumerId = "consumer"; + + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.needReassignment(); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + coordinator.ensurePartitionAssignment(); + + final AtomicBoolean received = new AtomicBoolean(false); + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + received.set(true); + LeaveGroupRequest leaveRequest = new LeaveGroupRequest(request.request().body()); + return leaveRequest.memberId().equals(consumerId) && + leaveRequest.groupId().equals(groupId); + } + }, new LeaveGroupResponse(Errors.NONE.code()).toStruct()); + coordinator.maybeLeaveGroup(false); + assertTrue(received.get()); + assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, coordinator.memberId); + assertEquals(OffsetCommitRequest.DEFAULT_GENERATION_ID, coordinator.generation); + } + + @Test public void testRebalanceInProgressOnSyncGroup() { final String consumerId = "consumer"; @@ -543,7 +604,7 @@ public class ConsumerCoordinatorTest { } @Test - public void testResetGeneration() { + public void testCommitAfterLeaveGroup() { // enable auto-assignment subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); @@ -555,8 +616,9 @@ public class ConsumerCoordinatorTest { coordinator.ensurePartitionAssignment(); // now switch to manual assignment + client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()).toStruct()); subscriptions.unsubscribe(); - coordinator.resetGeneration(); + coordinator.maybeLeaveGroup(false); subscriptions.assignFromUser(Arrays.asList(tp)); // the client should not reuse generation/memberId from auto-subscribed generation http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java index d11165c..c748971 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java @@ -242,14 +242,15 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos return super.needRejoin() || (assignmentSnapshot == null || assignmentSnapshot.failed()) || rejoinRequested; } - @Override - public void close() { - } - public String memberId() { return this.memberId; } + @Override + public void close() { + super.close(); + } + private class CopycatWorkerCoordinatorMetrics { public final Metrics metrics; public final String metricGrpName; http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 21434f7..df064e4 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -895,7 +895,7 @@ class KafkaApis(val requestChannel: RequestChannel, // let the coordinator to handle leave-group coordinator.handleLeaveGroup( leaveGroupRequest.groupId(), - leaveGroupRequest.consumerId(), + leaveGroupRequest.memberId(), sendResponseCallback) } }
