lucasbru commented on code in PR #18276:
URL: https://github.com/apache/kafka/pull/18276#discussion_r1905223256


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java:
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Contains all information related to a member within a Streams group.
+ * <p>
+ * This class is immutable and is fully backed by records stored in the 
__consumer_offsets topic.
+ *
+ * @param memberId                      The ID of the member.
+ * @param memberEpoch                   The current epoch of the member.
+ * @param previousMemberEpoch           The previous epoch of the member.
+ * @param state                         The current state of the member.
+ * @param instanceId                    The instance ID of the member.
+ * @param rackId                        The rack ID of the member.
+ * @param clientId                      The client ID of the member.
+ * @param clientHost                    The host of the member.
+ * @param rebalanceTimeoutMs            The rebalance timeout in milliseconds.
+ * @param topologyEpoch                 The epoch of the topology the member 
uses.
+ * @param processId                     The ID of the Streams client that 
contains the member.
+ * @param userEndpoint                  The user endpoint exposed for 
Interactive Queries by the Streams client that
+ *                                      contains the member.
+ * @param clientTags                    Tags of the client of the member used 
for rack-aware assignment.
+ * @param assignedActiveTasks           Active tasks assigned to the member.
+ *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
+ * @param assignedStandbyTasks          Standby tasks assigned to the member.
+ *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
+ * @param assignedWarmupTasks           Warm-up tasks assigned to the member.
+ *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
+ * @param activeTasksPendingRevocation  Active tasks assigned to the member 
pending revocation.
+ *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
+ * @param standbyTasksPendingRevocation Standby tasks assigned to the member 
pending revocation.
+ *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
+ * @param warmupTasksPendingRevocation  Warm-up tasks assigned to the member 
pending revocation.
+ *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
+ */
+@SuppressWarnings("checkstyle:JavaNCSS")
+public record StreamsGroupMember(String memberId,
+                                 int memberEpoch,
+                                 int previousMemberEpoch,
+                                 MemberState state,
+                                 Optional<String> instanceId,
+                                 Optional<String> rackId,
+                                 String clientId,
+                                 String clientHost,
+                                 int rebalanceTimeoutMs,
+                                 int topologyEpoch,
+                                 String processId,
+                                 
Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint,
+                                 Map<String, String> clientTags,
+                                 Map<String, Set<Integer>> assignedActiveTasks,
+                                 Map<String, Set<Integer>> 
assignedStandbyTasks,
+                                 Map<String, Set<Integer>> assignedWarmupTasks,
+                                 Map<String, Set<Integer>> 
activeTasksPendingRevocation,
+                                 Map<String, Set<Integer>> 
standbyTasksPendingRevocation,
+                                 Map<String, Set<Integer>> 
warmupTasksPendingRevocation) {
+
+    public StreamsGroupMember {

Review Comment:
    In `replay`, we will always only get a part of the information in streams 
group member information, so sometimes, we may get the assignment first, 
sometimes we may get the member metadata (e.g. process ID) first. By requiring 
that all feilds are non-null, we'll have to always fill the data with "valid 
looking but incorrect data". For example, if we get the current assignment 
first, we will create a `StreamsGroupMetadata` whose `clientHost` is the empty 
string, the `instanceId` is specified as not-defined.
   
   In normal situation, we should eventually read the second record, that 
replaces the incorrect data (no instance ID defined) with the correct data (ah, 
it's actually a static member). I just wonder if this may be a source of hidden 
bugs, because we may accidentally use the "incorrect" data.
   
   Allowing null here would represent this intermediate state correctly - if we 
have only read one record (e.g. only the assignment, but no member metadata 
yet), we are actually in a partially initialized state, and the member metadata 
is null.
   
   I'm not sure, I'm actually saying we need to allow null here. I think the 
same applies to consumer groups / share groups. But I think it's worth thinking 
about it / keeping it in mind.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java:
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * StreamsGroupMember contains all the information related to a member within 
a Streams group. This class is immutable and is fully backed
+ * by records stored in the __consumer_offsets topic.
+ */
+public class StreamsGroupMember {
+
+  /**
+     * A builder that facilitates the creation of a new member or the update 
of an existing one.
+     * <p>
+     * Please refer to the javadoc of {{@link StreamsGroupMember}} for the 
definition of the fields.
+     */
+    public static class Builder {
+
+        private final String memberId;
+        private int memberEpoch = 0;
+        private int previousMemberEpoch = -1;
+        private MemberState state = MemberState.STABLE;
+        private String instanceId = null;
+        private String rackId = null;
+        private int rebalanceTimeoutMs = -1;
+        private String clientId = "";
+        private String clientHost = "";
+        private int topologyEpoch = -1;
+        private String processId;
+        private StreamsGroupMemberMetadataValue.Endpoint userEndpoint;
+        private Map<String, String> clientTags = Collections.emptyMap();

Review Comment:
   As pointed out above - there may be some improvements to be made, but note 
that sometimes we may get the assignment before the member metadata during 
replay, so we have to be able to create members without member metadata, so the 
"start with an empty member and update" approach seems consistent to me, 
because it works the same way, whether I get the assignment first or the member 
metadata. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to