jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1607136651


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -420,12 +420,11 @@ public CompletableFuture<HeartbeatResponseData> heartbeat(
             );
         }
 
-        // Using a read operation is okay here as we ignore the last committed 
offset in the snapshot registry.
-        // This means we will read whatever is in the latest snapshot, which 
is how the old coordinator behaves.
-        return runtime.scheduleReadOperation(
+        return runtime.scheduleWriteOperation(
             "classic-group-heartbeat",
             topicPartitionFor(request.groupId()),
-            (coordinator, __) -> coordinator.classicGroupHeartbeat(context, 
request)
+            Duration.ofMillis(config.offsetCommitTimeoutMs),

Review Comment:
   not necessarily a comment for this PR but i wonder if we should change the 
name of this config since it's being used for all writes.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4209,31 +4241,67 @@ private void removePendingSyncMember(
      * @param context        The request context.
      * @param request        The actual Heartbeat request.
      *
-     * @return The Heartbeat response.
+     * @return The coordinator result that contains the heartbeat response.
      */
-    public HeartbeatResponseData classicGroupHeartbeat(
+    public CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> 
classicGroupHeartbeat(

Review Comment:
   maybe i'm missing something but i don't see where we actually initialize 
CoordinatorResult with records to write to the log



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat(
         }
     }
 
+    /**
+     * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+     * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+     * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+     * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual Heartbeat request.
+     *
+     * @return The coordinator result that contains the heartbeat response.
+     */
+    private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> 
classicGroupHeartbeatToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        HeartbeatRequestData request
+    ) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        String instanceId = request.groupInstanceId();
+        ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+        throwIfMemberDoesNotUseClassicProtocol(member);
+        throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+        scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+        Errors error = Errors.NONE;
+        if (member.memberEpoch() < group.groupEpoch() ||
+            member.state() == MemberState.UNREVOKED_PARTITIONS ||
+            (member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.hasUnreleasedPartitions(member))) {
+            error = Errors.REBALANCE_IN_PROGRESS;
+            scheduleConsumerGroupJoinTimeout(groupId, memberId, 
member.rebalanceTimeoutMs());

Review Comment:
   we are saying that we cancel the join timeout when we first convert to 
consumer group, then when we have a group epoch bump we tell the classic group 
member we're rebalancing and they should send a join request. is my 
understanding correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to