dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1238417804


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -171,70 +260,152 @@ GroupMetadataManager build() {
     /**
      * The maximum number of members allowed in a single consumer group.
      */
-    private final int consumerGroupMaxSize;
+    private final int groupMaxSize;
 
     /**
      * The heartbeat interval for consumer groups.
      */
     private final int consumerGroupHeartbeatIntervalMs;
 
     /**
-     * The topics metadata (or image).
+     * The metadata image.
+     */
+    private MetadataImage metadataImage;
+
+    // Rest of the fields are used for the generic group APIs.
+
+    /**
+     * An empty result returned to the state machine. This means that
+     * there are no records to append to the log.
+     *
+     * Package private for testing.
+     */
+    static final CoordinatorResult<CompletableFuture<Errors>, Record> 
EMPTY_RESULT =
+        new CoordinatorResult<>(Collections.emptyList(), 
CompletableFuture.completedFuture(null));
+
+    /**
+     * Initial rebalance delay for members joining a generic group.
+     */
+    private final int initialRebalanceDelayMs;
+
+    /**
+     * The timeout used to wait for a new member in milliseconds.
+     */
+    private final int newMemberJoinTimeoutMs;
+
+    /**
+     * The group minimum session timeout.
+     */
+    private final int groupMinSessionTimeoutMs;
+
+    /**
+     * The group maximum session timeout.
+     */
+    private final int groupMaxSessionTimeoutMs;
+
+    /**
+     * The timer to add and cancel group operations.
      */
-    private TopicsImage topicsImage;
+    private final Timer<CompletableFuture<Errors>, Record> timer;
+
+    /**
+     * The time.
+     */
+    private final Time time;
 
     private GroupMetadataManager(
         SnapshotRegistry snapshotRegistry,
         LogContext logContext,
         List<PartitionAssignor> assignors,
-        TopicsImage topicsImage,
-        int consumerGroupMaxSize,
-        int consumerGroupHeartbeatIntervalMs
+        MetadataImage metadataImage,
+        TopicPartition topicPartition,
+        int groupMaxSize,
+        int consumerGroupHeartbeatIntervalMs,
+        int initialRebalanceDelayMs,
+        int newMemberJoinTimeoutMs,
+        int groupMinSessionTimeoutMs,
+        int groupMaxSessionTimeoutMs,
+        Timer<CompletableFuture<Errors>, Record> timer,
+        Time time
     ) {
+        this.logContext = logContext;
         this.log = logContext.logger(GroupMetadataManager.class);
         this.snapshotRegistry = snapshotRegistry;
-        this.topicsImage = topicsImage;
+        this.metadataImage = metadataImage;
         this.assignors = 
assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, 
Function.identity()));
+        this.topicPartition = topicPartition;
         this.defaultAssignor = assignors.get(0);
         this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
-        this.consumerGroupMaxSize = consumerGroupMaxSize;
+        this.groupMaxSize = groupMaxSize;
         this.consumerGroupHeartbeatIntervalMs = 
consumerGroupHeartbeatIntervalMs;
+        this.initialRebalanceDelayMs = initialRebalanceDelayMs;
+        this.newMemberJoinTimeoutMs = newMemberJoinTimeoutMs;
+        this.groupMinSessionTimeoutMs = groupMinSessionTimeoutMs;
+        this.groupMaxSessionTimeoutMs = groupMaxSessionTimeoutMs;
+        this.timer = timer;
+        this.time = time;
+    }
+
+    /**
+     * When a new metadata image is pushed.
+     *
+     * @param metadataImage The new metadata image.
+     */
+    public void onNewMetadataImage(MetadataImage metadataImage) {
+        this.metadataImage = metadataImage;
     }
 
     /**
      * Gets or maybe creates a consumer group.
      *
      * @param groupId           The group id.
+     * @param groupType         The group type (generic or consumer).
      * @param createIfNotExists A boolean indicating whether the group should 
be
      *                          created if it does not exist.
      *
      * @return A ConsumerGroup.
+     * @throws InvalidGroupIdException  if the group id is invalid.
      * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
      *                                  if the group is not a consumer group.
      *
      * Package private for testing.
      */
-    ConsumerGroup getOrMaybeCreateConsumerGroup(
+    // Package private for testing.
+    Group getOrMaybeCreateGroup(

Review Comment:
   i am not convinced. the downside is that it will be harder to guarantee to 
uniqueness of the group id. it also means that we would have to check both maps 
for all other operations (e.g. list, delete, etc.). i think that it would be 
better to keep them in a single map.
   
   for this particular case, we could just have two methods:  
`getOrMaybeCreateConsumerGroup` and `getOrMaybeCreateGenericGroup`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to