dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1246646144


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -728,6 +794,81 @@ public void replay(
             }
             consumerGroup.removeMember(memberId);
         }
+
+        updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
consumerGroup.subscribedTopicNames());

Review Comment:
   `groupsByTopics` is a timeline data structure and we update it based on 
ConsumerGroupMemberMetadataKey/Value record. If the record can't be written, 
the state is reverted to the last written offset and this reverts all those 
pending changes.
   
   > i think i'm confused because in api handling (i.e. consumer group 
heartbeat) once we modify the timeline data structures we generate records to 
commit the offset in the timeline but here we do it in reverse
   
   In the api handling, we never update the timeline data structures. We only 
generate records and they are updated when the records are applied.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to