Repository: kafka Updated Branches: refs/heads/trunk a4551773c -> 7eee11451
KAFKA-2753: improve SyncGroup error handling in client Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang Closes #433 from hachikuji/KAFKA-2753 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7eee1145 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7eee1145 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7eee1145 Branch: refs/heads/trunk Commit: 7eee11451e1f0d17efa27775becfb370a9894d56 Parents: a455177 Author: Jason Gustafson <[email protected]> Authored: Thu Nov 5 10:22:21 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Nov 5 10:22:21 2015 -0800 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 28 +++++++--- .../internals/ConsumerCoordinatorTest.java | 55 +++++++++++++++++++- 2 files changed, 76 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7eee1145/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index e9af6c8..44371cb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -417,15 +417,31 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) { - short errorCode = syncResponse.errorCode(); - if (errorCode == Errors.NONE.code()) { - future.complete(syncResponse.memberAssignment()); + Errors error = Errors.forCode(syncResponse.errorCode()); + if (error == Errors.NONE) { + log.debug("Received successful sync group response for group {}: {}", groupId, syncResponse.toStruct()); sensors.syncLatency.record(response.requestLatencyMs()); - } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { - future.raise(new GroupAuthorizationException(groupId)); + future.complete(syncResponse.memberAssignment()); } else { AbstractCoordinator.this.rejoinNeeded = true; - future.raise(Errors.forCode(errorCode)); + if (error == Errors.GROUP_AUTHORIZATION_FAILED) { + future.raise(new GroupAuthorizationException(groupId)); + } else if (error == Errors.REBALANCE_IN_PROGRESS) { + log.info("SyncGroup for group {} failed due to coordinator rebalance, rejoining the group", groupId); + future.raise(error); + } else if (error == Errors.UNKNOWN_MEMBER_ID + || error == Errors.ILLEGAL_GENERATION) { + log.info("SyncGroup for group {} failed due to {}, rejoining the group", groupId, error); + AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; + future.raise(error); + } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE + || error == Errors.NOT_COORDINATOR_FOR_GROUP) { + log.info("SyncGroup for group {} failed due to {}, will find new coordinator and rejoin", groupId, error); + coordinatorDead(); + future.raise(error); + } else { + future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.exception().getMessage())); + } } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/7eee1145/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 391f719..8e47fc3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ApiException; @@ -422,6 +423,52 @@ public class ConsumerCoordinatorTest { assertEquals(OffsetCommitRequest.DEFAULT_GENERATION_ID, coordinator.generation); } + @Test(expected = KafkaException.class) + public void testUnexpectedErrorOnSyncGroup() { + final String consumerId = "consumer"; + + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.needReassignment(); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // join initially, but let coordinator rebalance on sync + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN.code())); + coordinator.ensurePartitionAssignment(); + } + + @Test + public void testUnknownMemberIdOnSyncGroup() { + final String consumerId = "consumer"; + + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.needReassignment(); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // join initially, but let coordinator returns unknown member id + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_MEMBER_ID.code())); + + // now we should see a new join with the empty UNKNOWN_MEMBER_ID + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + JoinGroupRequest joinRequest = new JoinGroupRequest(request.request().body()); + return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID); + } + }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + + coordinator.ensurePartitionAssignment(); + + assertFalse(subscriptions.partitionAssignmentNeeded()); + assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); + } + @Test public void testRebalanceInProgressOnSyncGroup() { final String consumerId = "consumer"; @@ -461,7 +508,13 @@ public class ConsumerCoordinatorTest { client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.ILLEGAL_GENERATION.code())); // then let the full join/sync finish successfully - client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code())); + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + JoinGroupRequest joinRequest = new JoinGroupRequest(request.request().body()); + return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID); + } + }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code())); client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); coordinator.ensurePartitionAssignment();
