Repository: kafka Updated Branches: refs/heads/trunk 5effe7239 -> 3620035c4
KAFKA-5611; AbstractCoordinator should handle wakeup raised from onJoinComplete Author: Jason Gustafson <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #3571 from hachikuji/KAFKA-5611 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3620035c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3620035c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3620035c Branch: refs/heads/trunk Commit: 3620035c45736b74e69921f7c50a1c3857aec334 Parents: 5effe72 Author: Jason Gustafson <[email protected]> Authored: Wed Jul 26 22:53:11 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Wed Jul 26 22:53:11 2017 -0700 ---------------------------------------------------------------------- .../consumer/ConsumerRebalanceListener.java | 16 ++++++++++ .../consumer/internals/AbstractCoordinator.java | 13 ++++++-- .../internals/AbstractCoordinatorTest.java | 32 ++++++++++++++++++++ 3 files changed, 58 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3620035c/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java index 3a3873a..845bff3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java @@ -87,8 +87,16 @@ public interface ConsumerRebalanceListener { * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} * <p> * <b>NOTE:</b> This method is only called before rebalances. It is not called prior to {@link KafkaConsumer#close()}. + * <p> + * It is common for the revocation callback to use the consumer instance in order to commit offsets. It is possible + * for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException} + * to be raised from one these nested invocations. In this case, the exception will be propagated to the current + * invocation of {@link KafkaConsumer#poll(long)} in which this callback is being executed. This means it is not + * necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread. * * @param partitions The list of partitions that were assigned to the consumer on the last rebalance + * @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer} + * @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer} */ void onPartitionsRevoked(Collection<TopicPartition> partitions); @@ -100,9 +108,17 @@ public interface ConsumerRebalanceListener { * It is guaranteed that all the processes in a consumer group will execute their * {@link #onPartitionsRevoked(Collection)} callback before any instance executes its * {@link #onPartitionsAssigned(Collection)} callback. + * <p> + * It is common for the assignment callback to use the consumer instance in order to query offsets. It is possible + * for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException} + * to be raised from one these nested invocations. In this case, the exception will be propagated to the current + * invocation of {@link KafkaConsumer#poll(long)} in which this callback is being executed. This means it is not + * necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread. * * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously * assigned to the consumer) + * @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer} + * @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer} */ void onPartitionsAssigned(Collection<TopicPartition> partitions); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3620035c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 29acc25..0f594df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -182,7 +182,10 @@ public abstract class AbstractCoordinator implements Closeable { Map<String, ByteBuffer> allMemberMetadata); /** - * Invoked when a group member has successfully joined a group. + * Invoked when a group member has successfully joined a group. If this call is woken up (i.e. + * if the invocation raises {@link org.apache.kafka.common.errors.WakeupException}), then it + * will be retried on the next call to {@link #ensureActiveGroup()}. + * * @param generation The generation that was joined * @param memberId The identifier for the local member in the group * @param protocol The protocol selected by the coordinator @@ -360,12 +363,16 @@ public abstract class AbstractCoordinator implements Closeable { RequestFuture<ByteBuffer> future = initiateJoinGroup(); client.poll(future); - resetJoinGroupFuture(); if (future.succeeded()) { - needsJoinPrepare = true; onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value()); + + // We reset the join group future only after the completion callback returns. This ensures + // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded. + resetJoinGroupFuture(); + needsJoinPrepare = true; } else { + resetJoinGroupFuture(); RuntimeException exception = future.exception(); if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || http://git-wip-us.apache.org/repos/asf/kafka/blob/3620035c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index dd1c79a..637c832 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -479,7 +479,36 @@ public class AbstractCoordinatorTest { assertEquals(0, coordinator.onJoinCompleteInvokes); assertFalse(heartbeatReceived.get()); + coordinator.ensureActiveGroup(); + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(1, coordinator.onJoinCompleteInvokes); + + awaitFirstHeartbeat(heartbeatReceived); + } + + @Test + public void testWakeupInOnJoinComplete() throws Exception { + setupCoordinator(RETRY_BACKOFF_MS); + + coordinator.wakeupOnJoinComplete = true; + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); + mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); + AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); + + try { + coordinator.ensureActiveGroup(); + fail("Should have woken up from ensureActiveGroup()"); + } catch (WakeupException e) { + } + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(0, coordinator.onJoinCompleteInvokes); + assertFalse(heartbeatReceived.get()); + // the join group completes in this poll() + coordinator.wakeupOnJoinComplete = false; consumerClient.poll(0); coordinator.ensureActiveGroup(); @@ -534,6 +563,7 @@ public class AbstractCoordinatorTest { private int onJoinPrepareInvokes = 0; private int onJoinCompleteInvokes = 0; + private boolean wakeupOnJoinComplete = false; public DummyCoordinator(ConsumerNetworkClient client, Metrics metrics, @@ -567,6 +597,8 @@ public class AbstractCoordinatorTest { @Override protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) { + if (wakeupOnJoinComplete) + throw new WakeupException(); onJoinCompleteInvokes++; } }
