lucasbru commented on code in PR #18276: URL: https://github.com/apache/kafka/pull/18276#discussion_r1905223256
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java: ########## @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Contains all information related to a member within a Streams group. + * <p> + * This class is immutable and is fully backed by records stored in the __consumer_offsets topic. + * + * @param memberId The ID of the member. + * @param memberEpoch The current epoch of the member. + * @param previousMemberEpoch The previous epoch of the member. + * @param state The current state of the member. + * @param instanceId The instance ID of the member. + * @param rackId The rack ID of the member. + * @param clientId The client ID of the member. + * @param clientHost The host of the member. + * @param rebalanceTimeoutMs The rebalance timeout in milliseconds. + * @param topologyEpoch The epoch of the topology the member uses. + * @param processId The ID of the Streams client that contains the member. + * @param userEndpoint The user endpoint exposed for Interactive Queries by the Streams client that + * contains the member. + * @param clientTags Tags of the client of the member used for rack-aware assignment. + * @param assignedActiveTasks Active tasks assigned to the member. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param assignedStandbyTasks Standby tasks assigned to the member. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param assignedWarmupTasks Warm-up tasks assigned to the member. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param activeTasksPendingRevocation Active tasks assigned to the member pending revocation. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param standbyTasksPendingRevocation Standby tasks assigned to the member pending revocation. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param warmupTasksPendingRevocation Warm-up tasks assigned to the member pending revocation. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + */ +@SuppressWarnings("checkstyle:JavaNCSS") +public record StreamsGroupMember(String memberId, + int memberEpoch, + int previousMemberEpoch, + MemberState state, + Optional<String> instanceId, + Optional<String> rackId, + String clientId, + String clientHost, + int rebalanceTimeoutMs, + int topologyEpoch, + String processId, + Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint, + Map<String, String> clientTags, + Map<String, Set<Integer>> assignedActiveTasks, + Map<String, Set<Integer>> assignedStandbyTasks, + Map<String, Set<Integer>> assignedWarmupTasks, + Map<String, Set<Integer>> activeTasksPendingRevocation, + Map<String, Set<Integer>> standbyTasksPendingRevocation, + Map<String, Set<Integer>> warmupTasksPendingRevocation) { + + public StreamsGroupMember { Review Comment: In `replay`, we will always only get a part of the information in streams group member information, so sometimes, we may get the assignment first, sometimes we may get the member metadata (e.g. process ID) first. By requiring that all feilds are non-null, we'll have to always fill the data with "valid looking but incorrect data". For example, if we get the current assignment first, we will create a `StreamsGroupMetadata` whose `clientHost` is the empty string, the `instanceId` is specified as not-defined. In normal situation, we should eventually read the second record, that replaces the incorrect data (no instance ID defined) with the correct data (ah, it's actually a static member). I just wonder if this may be a source of hidden bugs, because we may accidentally use the "incorrect" data. Allowing null here would represent this intermediate state correctly - if we have only read one record (e.g. only the assignment, but no member metadata yet), we are actually in a partially initialized state, and the member metadata is null. I'm not sure, I'm actually saying we need to allow null here. I think the same applies to consumer groups / share groups. But I think it's worth thinking about it / keeping it in mind. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java: ########## @@ -0,0 +1,756 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * StreamsGroupMember contains all the information related to a member within a Streams group. This class is immutable and is fully backed + * by records stored in the __consumer_offsets topic. + */ +public class StreamsGroupMember { + + /** + * A builder that facilitates the creation of a new member or the update of an existing one. + * <p> + * Please refer to the javadoc of {{@link StreamsGroupMember}} for the definition of the fields. + */ + public static class Builder { + + private final String memberId; + private int memberEpoch = 0; + private int previousMemberEpoch = -1; + private MemberState state = MemberState.STABLE; + private String instanceId = null; + private String rackId = null; + private int rebalanceTimeoutMs = -1; + private String clientId = ""; + private String clientHost = ""; + private int topologyEpoch = -1; + private String processId; + private StreamsGroupMemberMetadataValue.Endpoint userEndpoint; + private Map<String, String> clientTags = Collections.emptyMap(); Review Comment: As pointed out above - there may be some improvements to be made, but note that sometimes we may get the assignment before the member metadata during replay, so we have to be able to create members without member metadata, so the "start with an empty member and update" approach seems consistent to me, because it works the same way, whether I get the assignment first or the member metadata. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
