dajac commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1608287216
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2180,36 +2202,58 @@ private void cancelConsumerGroupRebalanceTimeout( } /** - * Schedules a sync timeout for the member. + * Schedules a join timeout for the member. * * @param groupId The group id. * @param memberId The member id. * @param rebalanceTimeoutMs The rebalance timeout. */ - private void scheduleConsumerGroupSyncTimeout( + private void scheduleConsumerGroupJoinTimeout( String groupId, String memberId, int rebalanceTimeoutMs ) { - String key = consumerGroupSyncKey(groupId, memberId); - timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { - try { - ConsumerGroup group = consumerGroup(groupId); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); - log.info("[GroupId {}] Member {} fenced from the group because its session expired.", - groupId, memberId); + timer.schedule( + consumerGroupJoinKey(groupId, memberId), + rebalanceTimeoutMs, + TimeUnit.MILLISECONDS, + true, + () -> consumerGroupFenceMemberOperation(groupId, memberId, "the member failed to join within timeout.") Review Comment: nit: `the classic member failed to join within the rebalance timeout`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat( } } + /** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param context The request context. + * @param request The actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ + private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeatToConsumerGroup( + ConsumerGroup group, + RequestContext context, + HeartbeatRequestData request + ) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { + String groupId = request.groupId(); + String memberId = request.memberId(); + String instanceId = request.groupInstanceId(); + ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + + throwIfMemberDoesNotUseClassicProtocol(member); + throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + + scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + + Errors error = Errors.NONE; + if (member.memberEpoch() < group.groupEpoch() || + member.state() == MemberState.UNREVOKED_PARTITIONS || + (member.state() == MemberState.UNRELEASED_PARTITIONS && !group.hasUnreleasedPartitions(member))) { + error = Errors.REBALANCE_IN_PROGRESS; + scheduleConsumerGroupJoinTimeout(groupId, memberId, member.rebalanceTimeoutMs()); Review Comment: I think that we have an issue here. The issue is that the HB continues while the rebalance is on-going so it will keep overriding the timer. I wonder if we could add the timer only if it does not exist yet (e.g. `scheduleIfAbsent`). ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1753,6 +1753,7 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro CompletableFuture<Void> appendFuture = new CompletableFuture<>(); appendFuture.whenComplete((__, t) -> { if (t == null) { + cancelConsumerGroupJoinTimeout(groupId, response.memberId()); Review Comment: Could we cover this change in an existing unit test? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2089,25 +2123,13 @@ private void scheduleConsumerGroupSessionTimeout( String memberId, int sessionTimeoutMs ) { - String key = consumerGroupSessionTimeoutKey(groupId, memberId); - timer.schedule(key, sessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { - try { - ConsumerGroup group = consumerGroup(groupId); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); - log.info("[GroupId {}] Member {} fenced from the group because its session expired.", - groupId, memberId); - - return consumerGroupFenceMember(group, member, null); - } catch (GroupIdNotFoundException ex) { - log.debug("[GroupId {}] Could not fence {} because the group does not exist.", - groupId, memberId); - } catch (UnknownMemberIdException ex) { - log.debug("[GroupId {}] Could not fence {} because the member does not exist.", - groupId, memberId); - } - - return new CoordinatorResult<>(Collections.emptyList()); - }); + timer.schedule( + consumerGroupSessionTimeoutKey(groupId, memberId), + sessionTimeoutMs, + TimeUnit.MILLISECONDS, + true, + () -> consumerGroupFenceMemberOperation(groupId, memberId, "the member session expires.") Review Comment: nit: `expired`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -1300,4 +1300,28 @@ public boolean allMembersUseClassicProtocolExcept(String memberId) { return numClassicProtocolMembers() == members().size() - 1 && !getOrMaybeCreateMember(memberId, false).useClassicProtocol(); } + + /** + * Checks whether the member has any unreleased partition. + * + * @param member The member to check. + * @return A boolean indicating whether the member has partitions in the target + * assignment that hasn't been revoked by other members. + */ + public boolean hasUnreleasedPartitions(ConsumerGroupMember member) { + if (member.state() == MemberState.UNRELEASED_PARTITIONS) { + for (Map.Entry<Uuid, Set<Integer>> entry : targetAssignment().get(member.memberId()).partitions().entrySet()) { + Uuid topicId = entry.getKey(); + Set<Integer> assignedPartitions = member.assignedPartitions().get(topicId); Review Comment: Should we use `getOrDefault` and remove `assignedPartitions == null` below? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -12403,6 +12397,241 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce ); } + @Test + public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + int sessionTimeout = 5000; + + List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Arrays.asList("foo"), + null, + Collections.emptyList() + ) + ))) + ); + + // Consumer group with a member using the classic protocol. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(sessionTimeout) + .setSupportedProtocols(protocols) + ) + .setMemberEpoch(10) + .build())) + .build(); + + // Heartbeat to schedule the session timeout. + HeartbeatRequestData request = new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setGenerationId(10); + context.sendClassicGroupHeartbeat(request); + context.assertSessionTimeout(groupId, memberId, sessionTimeout); + + // Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + + HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); + assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); + context.assertSessionTimeout(groupId, memberId, sessionTimeout); + + // Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + + heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); + assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); + context.assertSessionTimeout(groupId, memberId, sessionTimeout); + } + + @Test + public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() throws Exception { + String groupId = "group-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + int sessionTimeout = 5000; + int rebalanceTimeout = 10000; + + List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Arrays.asList("foo"), + null, + Collections.emptyList() + ) + ))) + ); + + // Member 1 has a member epoch smaller than the group epoch. + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setRebalanceTimeoutMs(rebalanceTimeout) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(sessionTimeout) + .setSupportedProtocols(protocols) + ) + .setMemberEpoch(9) + .build(); + + // Member 2 has unrevoked partition. + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setRebalanceTimeoutMs(rebalanceTimeout) + .setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 0))) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(sessionTimeout) + .setSupportedProtocols(protocols) + ) + .setMemberEpoch(10) + .build(); + + // Member 3 is in UNRELEASED_PARTITIONS and all the partitions in its target assignment are free. + ConsumerGroupMember member3 = new ConsumerGroupMember.Builder(memberId3) + .setState(MemberState.UNRELEASED_PARTITIONS) + .setRebalanceTimeoutMs(rebalanceTimeout) + .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 0))) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(sessionTimeout) + .setSupportedProtocols(protocols) + ) + .setMemberEpoch(10) + .build(); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withMember(member3) + .withAssignment(memberId3, mkAssignment(mkTopicAssignment(barTopicId, 0, 1, 2)))) + .build(); + + Arrays.asList(memberId1, memberId2, memberId3).forEach(memberId -> { + CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> heartbeatResult = context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setGenerationId(memberId.equals(memberId1) ? 9 : 10) + ); + assertEquals(Collections.emptyList(), heartbeatResult.records()); + assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResult.response().errorCode()); + context.assertSessionTimeout(groupId, memberId, sessionTimeout); + context.assertJoinTimeout(groupId, memberId, rebalanceTimeout); + }); + } + + @Test + public void testClassicGroupHeartbeatToConsumerWithUnknownMember() { + String groupId = "group-id"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)) + .build(); + + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId("unknown-member-id") + .setGenerationId(10) + )); + + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId("unknown-member-id") + .setGroupInstanceId("unknown-instance-id") + .setGenerationId(10) + )); + } + + @Test + public void testClassicGroupHeartbeatToConsumerWithFencedInstanceId() { + String groupId = "group-id"; + String memberId = "member-id"; + String instanceId = "instance-id"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setInstanceId(instanceId) + .setMemberEpoch(10) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(5000) + .setSupportedProtocols(Collections.emptyList()) + ) + .build())) + .build(); + + assertThrows(FencedInstanceIdException.class, () -> context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId("unknown-member-id") + .setGroupInstanceId(instanceId) + .setGenerationId(10) + )); + } + + @Test + public void testClassicGroupHeartbeatToConsumerWithIllegalGenerationId() { + String groupId = "group-id"; + String memberId = "member-id"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setMemberEpoch(10) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(5000) + .setSupportedProtocols(Collections.emptyList()) + ) + .build())) + .build(); + + assertThrows(IllegalGenerationException.class, () -> context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setGenerationId(9) + )); + } + + @Test + public void testClassicGroupHeartbeatToConsumerWithMemberNotUsingClassicProtocol() { + String groupId = "group-id"; + String memberId = "member-id"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setMemberEpoch(10) + .build())) + .build(); + + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setGenerationId(10) + )); + } + Review Comment: * Could we add a test which ensures that the member is fenced when the session timeouts? * Could we also add one, if not already present, which ensures that the member is fenced if it does not rejoin in time? * Do we need to update the two "rebalance" unit tests coving the eager and cooperative protocol too? I am not sure if it is necessary. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat( } } + /** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param context The request context. + * @param request The actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ + private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeatToConsumerGroup( + ConsumerGroup group, + RequestContext context, + HeartbeatRequestData request + ) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { + String groupId = request.groupId(); + String memberId = request.memberId(); + String instanceId = request.groupInstanceId(); + ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + + throwIfMemberDoesNotUseClassicProtocol(member); + throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + + scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + + Errors error = Errors.NONE; + if (member.memberEpoch() < group.groupEpoch() || + member.state() == MemberState.UNREVOKED_PARTITIONS || + (member.state() == MemberState.UNRELEASED_PARTITIONS && !group.hasUnreleasedPartitions(member))) { Review Comment: Let's add a comment to explain this. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -1300,4 +1300,28 @@ public boolean allMembersUseClassicProtocolExcept(String memberId) { return numClassicProtocolMembers() == members().size() - 1 && !getOrMaybeCreateMember(memberId, false).useClassicProtocol(); } + + /** + * Checks whether the member has any unreleased partition. + * + * @param member The member to check. + * @return A boolean indicating whether the member has partitions in the target + * assignment that hasn't been revoked by other members. + */ + public boolean hasUnreleasedPartitions(ConsumerGroupMember member) { Review Comment: I wonder if we could call it something like `waitingOnUnreleasedPartition`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -420,12 +420,11 @@ public CompletableFuture<HeartbeatResponseData> heartbeat( ); } - // Using a read operation is okay here as we ignore the last committed offset in the snapshot registry. - // This means we will read whatever is in the latest snapshot, which is how the old coordinator behaves. - return runtime.scheduleReadOperation( + return runtime.scheduleWriteOperation( "classic-group-heartbeat", topicPartitionFor(request.groupId()), - (coordinator, __) -> coordinator.classicGroupHeartbeat(context, request) + Duration.ofMillis(config.offsetCommitTimeoutMs), Review Comment: I think that we should actually not use `offsetCommitTimeoutMs` as timeout for any operations except the one writing offsets. I think that we used it because we had no others. We can address this separately. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -4209,31 +4241,67 @@ private void removePendingSyncMember( * @param context The request context. * @param request The actual Heartbeat request. * - * @return The Heartbeat response. + * @return The coordinator result that contains the heartbeat response. */ - public HeartbeatResponseData classicGroupHeartbeat( + public CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeat( Review Comment: We don't generate any records. However, as we access the uncommitted state, we must use a write operation to ensure that we don't return before the state read is committed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org