dajac commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1608287216


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2180,36 +2202,58 @@ private void cancelConsumerGroupRebalanceTimeout(
     }
 
     /**
-     * Schedules a sync timeout for the member.
+     * Schedules a join timeout for the member.
      *
      * @param groupId               The group id.
      * @param memberId              The member id.
      * @param rebalanceTimeoutMs    The rebalance timeout.
      */
-    private void scheduleConsumerGroupSyncTimeout(
+    private void scheduleConsumerGroupJoinTimeout(
         String groupId,
         String memberId,
         int rebalanceTimeoutMs
     ) {
-        String key = consumerGroupSyncKey(groupId, memberId);
-        timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
-            try {
-                ConsumerGroup group = consumerGroup(groupId);
-                ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
-                log.info("[GroupId {}] Member {} fenced from the group because 
its session expired.",
-                    groupId, memberId);
+        timer.schedule(
+            consumerGroupJoinKey(groupId, memberId),
+            rebalanceTimeoutMs,
+            TimeUnit.MILLISECONDS,
+            true,
+            () -> consumerGroupFenceMemberOperation(groupId, memberId, "the 
member failed to join within timeout.")

Review Comment:
   nit: `the classic member failed to join within the rebalance timeout`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat(
         }
     }
 
+    /**
+     * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+     * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+     * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+     * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual Heartbeat request.
+     *
+     * @return The coordinator result that contains the heartbeat response.
+     */
+    private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> 
classicGroupHeartbeatToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        HeartbeatRequestData request
+    ) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        String instanceId = request.groupInstanceId();
+        ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+        throwIfMemberDoesNotUseClassicProtocol(member);
+        throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+        scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+        Errors error = Errors.NONE;
+        if (member.memberEpoch() < group.groupEpoch() ||
+            member.state() == MemberState.UNREVOKED_PARTITIONS ||
+            (member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.hasUnreleasedPartitions(member))) {
+            error = Errors.REBALANCE_IN_PROGRESS;
+            scheduleConsumerGroupJoinTimeout(groupId, memberId, 
member.rebalanceTimeoutMs());

Review Comment:
   I think that we have an issue here. The issue is that the HB continues while 
the rebalance is on-going so it will keep overriding the timer. I wonder if we 
could add the timer only if it does not exist yet (e.g. `scheduleIfAbsent`).



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1753,6 +1753,7 @@ private CoordinatorResult<Void, CoordinatorRecord> 
classicGroupJoinToConsumerGro
         CompletableFuture<Void> appendFuture = new CompletableFuture<>();
         appendFuture.whenComplete((__, t) -> {
             if (t == null) {
+                cancelConsumerGroupJoinTimeout(groupId, response.memberId());

Review Comment:
   Could we cover this change in an existing unit test?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2089,25 +2123,13 @@ private void scheduleConsumerGroupSessionTimeout(
         String memberId,
         int sessionTimeoutMs
     ) {
-        String key = consumerGroupSessionTimeoutKey(groupId, memberId);
-        timer.schedule(key, sessionTimeoutMs, TimeUnit.MILLISECONDS, true, () 
-> {
-            try {
-                ConsumerGroup group = consumerGroup(groupId);
-                ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
-                log.info("[GroupId {}] Member {} fenced from the group because 
its session expired.",
-                    groupId, memberId);
-
-                return consumerGroupFenceMember(group, member, null);
-            } catch (GroupIdNotFoundException ex) {
-                log.debug("[GroupId {}] Could not fence {} because the group 
does not exist.",
-                    groupId, memberId);
-            } catch (UnknownMemberIdException ex) {
-                log.debug("[GroupId {}] Could not fence {} because the member 
does not exist.",
-                    groupId, memberId);
-            }
-
-            return new CoordinatorResult<>(Collections.emptyList());
-        });
+        timer.schedule(
+            consumerGroupSessionTimeoutKey(groupId, memberId),
+            sessionTimeoutMs,
+            TimeUnit.MILLISECONDS,
+            true,
+            () -> consumerGroupFenceMemberOperation(groupId, memberId, "the 
member session expires.")

Review Comment:
   nit: `expired`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -1300,4 +1300,28 @@ public boolean allMembersUseClassicProtocolExcept(String 
memberId) {
         return numClassicProtocolMembers() == members().size() - 1 &&
             !getOrMaybeCreateMember(memberId, false).useClassicProtocol();
     }
+
+    /**
+     * Checks whether the member has any unreleased partition.
+     *
+     * @param member The member to check.
+     * @return A boolean indicating whether the member has partitions in the 
target
+     *         assignment that hasn't been revoked by other members.
+     */
+    public boolean hasUnreleasedPartitions(ConsumerGroupMember member) {
+        if (member.state() == MemberState.UNRELEASED_PARTITIONS) {
+            for (Map.Entry<Uuid, Set<Integer>> entry : 
targetAssignment().get(member.memberId()).partitions().entrySet()) {
+                Uuid topicId = entry.getKey();
+                Set<Integer> assignedPartitions = 
member.assignedPartitions().get(topicId);

Review Comment:
   Should we use `getOrDefault` and remove `assignedPartitions == null` below?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -12403,6 +12397,241 @@ public void 
testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce
         );
     }
 
+    @Test
+    public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() 
throws Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+        int sessionTimeout = 5000;
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+                    new ConsumerPartitionAssignor.Subscription(
+                        Arrays.asList("foo"),
+                        null,
+                        Collections.emptyList()
+                    )
+                )))
+        );
+
+        // Consumer group with a member using the classic protocol.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setClassicMemberMetadata(
+                        new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                            .setSessionTimeoutMs(sessionTimeout)
+                            .setSupportedProtocols(protocols)
+                    )
+                    .setMemberEpoch(10)
+                    .build()))
+            .build();
+
+        // Heartbeat to schedule the session timeout.
+        HeartbeatRequestData request = new HeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId)
+            .setGenerationId(10);
+        context.sendClassicGroupHeartbeat(request);
+        context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+        // Advance clock by 1/2 of session timeout.
+        
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+        assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+        context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+        // Advance clock by 1/2 of session timeout.
+        
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+        heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+        assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+        context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+    }
+
+    @Test
+    public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() 
throws Exception {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+        int sessionTimeout = 5000;
+        int rebalanceTimeout = 10000;
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+                    new ConsumerPartitionAssignor.Subscription(
+                        Arrays.asList("foo"),
+                        null,
+                        Collections.emptyList()
+                    )
+                )))
+        );
+
+        // Member 1 has a member epoch smaller than the group epoch.
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setRebalanceTimeoutMs(rebalanceTimeout)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(sessionTimeout)
+                    .setSupportedProtocols(protocols)
+            )
+            .setMemberEpoch(9)
+            .build();
+
+        // Member 2 has unrevoked partition.
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.UNREVOKED_PARTITIONS)
+            .setRebalanceTimeoutMs(rebalanceTimeout)
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(sessionTimeout)
+                    .setSupportedProtocols(protocols)
+            )
+            .setMemberEpoch(10)
+            .build();
+
+        // Member 3 is in UNRELEASED_PARTITIONS and all the partitions in its 
target assignment are free.
+        ConsumerGroupMember member3 = new 
ConsumerGroupMember.Builder(memberId3)
+            .setState(MemberState.UNRELEASED_PARTITIONS)
+            .setRebalanceTimeoutMs(rebalanceTimeout)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(sessionTimeout)
+                    .setSupportedProtocols(protocols)
+            )
+            .setMemberEpoch(10)
+            .build();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withMember(member3)
+                .withAssignment(memberId3, 
mkAssignment(mkTopicAssignment(barTopicId, 0, 1, 2))))
+            .build();
+
+        Arrays.asList(memberId1, memberId2, memberId3).forEach(memberId -> {
+            CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> 
heartbeatResult = context.sendClassicGroupHeartbeat(
+                new HeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setGenerationId(memberId.equals(memberId1) ? 9 : 10)
+            );
+            assertEquals(Collections.emptyList(), heartbeatResult.records());
+            assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResult.response().errorCode());
+            context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+            context.assertJoinTimeout(groupId, memberId, rebalanceTimeout);
+        });
+    }
+
+    @Test
+    public void testClassicGroupHeartbeatToConsumerWithUnknownMember() {
+        String groupId = "group-id";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10))
+            .build();
+
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupHeartbeat(
+            new HeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId("unknown-member-id")
+                .setGenerationId(10)
+        ));
+
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupHeartbeat(
+            new HeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId("unknown-member-id")
+                .setGroupInstanceId("unknown-instance-id")
+                .setGenerationId(10)
+        ));
+    }
+
+    @Test
+    public void testClassicGroupHeartbeatToConsumerWithFencedInstanceId() {
+        String groupId = "group-id";
+        String memberId = "member-id";
+        String instanceId = "instance-id";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setInstanceId(instanceId)
+                    .setMemberEpoch(10)
+                    .setClassicMemberMetadata(
+                        new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                            .setSessionTimeoutMs(5000)
+                            .setSupportedProtocols(Collections.emptyList())
+                    )
+                    .build()))
+            .build();
+
+        assertThrows(FencedInstanceIdException.class, () -> 
context.sendClassicGroupHeartbeat(
+            new HeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId("unknown-member-id")
+                .setGroupInstanceId(instanceId)
+                .setGenerationId(10)
+        ));
+    }
+
+    @Test
+    public void testClassicGroupHeartbeatToConsumerWithIllegalGenerationId() {
+        String groupId = "group-id";
+        String memberId = "member-id";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setMemberEpoch(10)
+                    .setClassicMemberMetadata(
+                        new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                            .setSessionTimeoutMs(5000)
+                            .setSupportedProtocols(Collections.emptyList())
+                    )
+                    .build()))
+            .build();
+
+        assertThrows(IllegalGenerationException.class, () -> 
context.sendClassicGroupHeartbeat(
+            new HeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setGenerationId(9)
+        ));
+    }
+
+    @Test
+    public void 
testClassicGroupHeartbeatToConsumerWithMemberNotUsingClassicProtocol() {
+        String groupId = "group-id";
+        String memberId = "member-id";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setMemberEpoch(10)
+                    .build()))
+            .build();
+
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupHeartbeat(
+            new HeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setGenerationId(10)
+        ));
+    }
+

Review Comment:
   * Could we add a test which ensures that the member is fenced when the 
session timeouts?
   * Could we also add one, if not already present, which ensures that the 
member is fenced if it does not rejoin in time?
   * Do we need to update the two "rebalance" unit tests coving the eager and 
cooperative protocol too? I am not sure if it is necessary.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat(
         }
     }
 
+    /**
+     * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+     * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+     * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+     * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual Heartbeat request.
+     *
+     * @return The coordinator result that contains the heartbeat response.
+     */
+    private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> 
classicGroupHeartbeatToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        HeartbeatRequestData request
+    ) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        String instanceId = request.groupInstanceId();
+        ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+        throwIfMemberDoesNotUseClassicProtocol(member);
+        throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+        scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+        Errors error = Errors.NONE;
+        if (member.memberEpoch() < group.groupEpoch() ||
+            member.state() == MemberState.UNREVOKED_PARTITIONS ||
+            (member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.hasUnreleasedPartitions(member))) {

Review Comment:
   Let's add a comment to explain this.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -1300,4 +1300,28 @@ public boolean allMembersUseClassicProtocolExcept(String 
memberId) {
         return numClassicProtocolMembers() == members().size() - 1 &&
             !getOrMaybeCreateMember(memberId, false).useClassicProtocol();
     }
+
+    /**
+     * Checks whether the member has any unreleased partition.
+     *
+     * @param member The member to check.
+     * @return A boolean indicating whether the member has partitions in the 
target
+     *         assignment that hasn't been revoked by other members.
+     */
+    public boolean hasUnreleasedPartitions(ConsumerGroupMember member) {

Review Comment:
   I wonder if we could call it something like `waitingOnUnreleasedPartition`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -420,12 +420,11 @@ public CompletableFuture<HeartbeatResponseData> heartbeat(
             );
         }
 
-        // Using a read operation is okay here as we ignore the last committed 
offset in the snapshot registry.
-        // This means we will read whatever is in the latest snapshot, which 
is how the old coordinator behaves.
-        return runtime.scheduleReadOperation(
+        return runtime.scheduleWriteOperation(
             "classic-group-heartbeat",
             topicPartitionFor(request.groupId()),
-            (coordinator, __) -> coordinator.classicGroupHeartbeat(context, 
request)
+            Duration.ofMillis(config.offsetCommitTimeoutMs),

Review Comment:
   I think that we should actually not use `offsetCommitTimeoutMs` as timeout 
for any operations except the one writing offsets. I think that we used it 
because we had no others. We can address this separately.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4209,31 +4241,67 @@ private void removePendingSyncMember(
      * @param context        The request context.
      * @param request        The actual Heartbeat request.
      *
-     * @return The Heartbeat response.
+     * @return The coordinator result that contains the heartbeat response.
      */
-    public HeartbeatResponseData classicGroupHeartbeat(
+    public CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> 
classicGroupHeartbeat(

Review Comment:
   We don't generate any records. However, as we access the uncommitted state, 
we must use a write operation to ensure that we don't return before the state 
read is committed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to