[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191602652


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,1012 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.Group;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ * This class holds metadata for a generic group where the
+ * member assignment is driven solely from the client side.
+ *
+ * The APIs members use to make changes to the group membership
+ * consist of JoinGroup, SyncGroup, and LeaveGroup.
+ */
+public class GenericGroup implements Group {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with a client id or a group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members awaiting a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to the number of members that support them.
+ */
+private final Map supportedProtocols = new HashMap<>();
+
+/**
+ * Members who have yet to sync with the group
+ * during the sync group pha

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191602126


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,998 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.Group;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group where the
+ * member assignment is driven solely from the client side.
+ *
+ * The APIs members use to make changes to the group membership
+ * consist of JoinGroup, SyncGroup, and LeaveGroup.
+ */
+public class GenericGroup implements Group {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with a client id or a group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;

Review Comment:
   This is a bit awkward as the existing GroupMetadata updates this field when 
we read the group metadata record 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1225).
 
   
   So we should expect the new group metadata manager introduced in 
https://github.com/apache/kafka/pull/13639 to perform this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191374954


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class GenericGroupTest {
+private final String protocolType = "consumer";
+private final String groupInstanceId = "groupInstanceId";
+private final String memberId = "memberId";
+private final String clientId = "clientId";
+private final String clientHost = "clientHost";
+private final int rebalanceTimeoutMs = 6;
+private final int sessionTimeoutMs = 1;
+
+
+private GenericGroup group = null;
+
+@BeforeEach
+public void initialize() {
+group = new GenericGroup(new LogContext(), "groupId", EMPTY, 
Time.SYSTEM);
+}
+
+@Test
+public void testCanRebalanceWhenStable() {
+assertTrue(group.canRebalance());
+}
+
+@Test
+public void testCanRebalanceWhenCompletingRebalance() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(COMPLETING_REBALANCE);
+assertTrue(group.canRebalance()); 
+}
+
+@Test
+public void testCannotRebalanceWhenPreparingRebalance() {
+group.transitionTo(PREPARING_REBALANCE);
+assertFalse(group.canRebalance());
+}
+
+@Test
+public void testCannotRebalanceWhenDead() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+group.transitionTo(DEAD);
+assertFalse(group.canRebalance());
+}
+
+@Test
+public void testStableToPreparingRebalanceTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+assertState(group, PREPARING_REBALANCE);
+}
+
+@Test
+public void testStableToDeadTransition() {
+group.transitionTo(DEAD);
+assertState(group, DEAD);
+}
+
+@Test
+public void testAwaitingRebalanceToPreparingRebalanceTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(COMPLETING_REBALANCE);
+group.transitionTo(PREPARING_REBALANCE);
+assertState(group, PREPARING_REBALANCE);
+}
+
+@Test
+public void testPreparingRebalanceToDeadTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(DEAD);
+assertState(group, DEAD);
+}
+
+@Test
+public void testPreparingRebalanceToEmptyTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+assertState(group, EMPTY);
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191359030


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class GenericGroupTest {
+private final String protocolType = "consumer";
+private final String groupInstanceId = "groupInstanceId";
+private final String memberId = "memberId";
+private final String clientId = "clientId";
+private final String clientHost = "clientHost";
+private final int rebalanceTimeoutMs = 6;
+private final int sessionTimeoutMs = 1;
+
+
+private GenericGroup group = null;
+
+@BeforeEach
+public void initialize() {
+group = new GenericGroup(new LogContext(), "groupId", EMPTY, 
Time.SYSTEM);
+}
+
+@Test
+public void testCanRebalanceWhenStable() {
+assertTrue(group.canRebalance());
+}
+
+@Test
+public void testCanRebalanceWhenCompletingRebalance() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(COMPLETING_REBALANCE);
+assertTrue(group.canRebalance()); 
+}
+
+@Test
+public void testCannotRebalanceWhenPreparingRebalance() {
+group.transitionTo(PREPARING_REBALANCE);
+assertFalse(group.canRebalance());
+}
+
+@Test
+public void testCannotRebalanceWhenDead() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+group.transitionTo(DEAD);
+assertFalse(group.canRebalance());
+}
+
+@Test
+public void testStableToPreparingRebalanceTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+assertState(group, PREPARING_REBALANCE);
+}
+
+@Test
+public void testStableToDeadTransition() {
+group.transitionTo(DEAD);
+assertState(group, DEAD);
+}
+
+@Test
+public void testAwaitingRebalanceToPreparingRebalanceTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(COMPLETING_REBALANCE);
+group.transitionTo(PREPARING_REBALANCE);
+assertState(group, PREPARING_REBALANCE);
+}
+
+@Test
+public void testPreparingRebalanceToDeadTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(DEAD);
+assertState(group, DEAD);
+}
+
+@Test
+public void testPreparingRebalanceToEmptyTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+assertState(group, EMPTY);
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191340188


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,998 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.Group;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group where the
+ * member assignment is driven solely from the client side.
+ *
+ * The APIs members use to make changes to the group membership
+ * consist of JoinGroup, SyncGroup, and LeaveGroup.
+ */
+public class GenericGroup implements Group {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with a client id or a group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members awaiting a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to the number of members that support them.
+ */
+private final Map supportedP

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191303709


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,998 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.Group;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group where the
+ * member assignment is driven solely from the client side.
+ *
+ * The APIs members use to make changes to the group membership
+ * consist of JoinGroup, SyncGroup, and LeaveGroup.
+ */
+public class GenericGroup implements Group {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with a client id or a group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members awaiting a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to the number of members that support them.
+ */
+private final Map supportedP

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190519534


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group

Review Comment:
   this looks grammatically correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190517157


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned

Review Comment:
   this looks grammatically correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190204060


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.GroupMetadata} that is used
+ * by the new group coordinator (KIP-848). Offset management will be handled
+ * by a different component.
+ *
+ * This class holds group metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The initial group state.
+ */
+private final GenericGroupState initialState;

Review Comment:
   it's only used to initializer state. removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190199871


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##
@@ -0,0 +1,876 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Stable;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class GenericGroupTest {
+private final String protocolType = "consumer";
+private final String groupInstanceId = "groupInstanceId";
+private final String memberId = "memberId";
+private final String clientId = "clientId";
+private final String clientHost = "clientHost";
+private final int rebalanceTimeoutMs = 6;
+private final int sessionTimeoutMs = 1;
+
+
+private GenericGroup group = null;
+
+@BeforeEach
+public void initialize() {
+group = new GenericGroup(new LogContext(), "groupId", Empty, 
Time.SYSTEM);

Review Comment:
   i'll keep this as is since this variable is used for all tests.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##
@@ -0,0 +1,876 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Completab

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190181383


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.GroupMetadata} that is used
+ * by the new group coordinator (KIP-848). Offset management will be handled
+ * by a different component.
+ *
+ * This class holds group metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The initial group state.
+ */
+private final GenericGroupState initialState;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The lock used to synchronize the group.
+ */
+private final Lock lock = new ReentrantLock();
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingMembers = new HashSet<>();
+
+/**
+   

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190177045


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.GroupMetadata} that is used
+ * by the new group coordinator (KIP-848). Offset management will be handled
+ * by a different component.
+ *
+ * This class holds group metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The initial group state.
+ */
+private final GenericGroupState initialState;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The lock used to synchronize the group.
+ */
+private final Lock lock = new ReentrantLock();
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingMembers = new HashSet<>();
+
+/**
+   

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190175577


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.GroupMetadata} that is used
+ * by the new group coordinator (KIP-848). Offset management will be handled
+ * by a different component.
+ *
+ * This class holds group metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The initial group state.
+ */
+private final GenericGroupState initialState;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The lock used to synchronize the group.
+ */
+private final Lock lock = new ReentrantLock();
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingMembers = new HashSet<>();
+
+/**
+   

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190154104


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.GroupMetadata} that is used
+ * by the new group coordinator (KIP-848). Offset management will be handled
+ * by a different component.
+ *
+ * This class holds group metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The initial group state.
+ */
+private final GenericGroupState initialState;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The lock used to synchronize the group.
+ */
+private final Lock lock = new ReentrantLock();
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingMembers = new HashSet<>();
+
+/**
+   

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1189924161


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.GroupMetadata} that is used
+ * by the new group coordinator (KIP-848). Offset management will be handled
+ * by a different component.
+ *
+ * This class holds group metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The initial group state.
+ */
+private final GenericGroupState initialState;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The lock used to synchronize the group.
+ */
+private final Lock lock = new ReentrantLock();
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingMembers = new HashSet<>();
+
+/**
+   

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-09 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1188772104


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala:
##
@@ -160,6 +160,7 @@ class GroupMetadataTest {
 assertThrows(classOf[IllegalStateException], () => 
group.transitionTo(CompletingRebalance))
   }
 
+  @Test

Review Comment:
   created https://github.com/apache/kafka/pull/13694



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org