dajac commented on code in PR #14323:
URL: https://github.com/apache/kafka/pull/14323#discussion_r1324405161


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+
+import java.util.Optional;
+
+/**
+ * Manages group membership for a single member.
+ * Responsible for:
+ * <li>Keeping member state</li>
+ * <li>Keeping assignment for the member</li>
+ * <li>Computing assignment for the group if the member is required to do 
so<li/>
+ */
+public interface MembershipManager {
+
+    String groupId();

Review Comment:
   nit: I would be great o add javadoc to all methods for consistency.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Optional;
+
+/**
+ * Membership manager that maintains group membership for a single member 
following the new
+ * consumer group protocol.
+ * <p/>
+ * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
+ * the member receives. It is also responsible for computing assignment for 
the group based on
+ * the metadata, if the member has been selected by the broker to do so.
+ */
+public class MembershipManagerImpl implements MembershipManager {
+
+    private final String groupId;
+    private final Optional<String> groupInstanceId;
+    private String memberId;
+    private int memberEpoch;
+    private MemberState state;
+    private AssignorSelection assignorSelection;
+
+    /**
+     * Assignment that the member received from the server and successfully 
processed.
+     */
+    private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+    /**

Review Comment:
   nit: We usually put an empty line before the javadoc. At least, we do so on 
the server side.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Optional;
+
+/**
+ * Membership manager that maintains group membership for a single member 
following the new
+ * consumer group protocol.
+ * <p/>
+ * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
+ * the member receives. It is also responsible for computing assignment for 
the group based on
+ * the metadata, if the member has been selected by the broker to do so.
+ */
+public class MembershipManagerImpl implements MembershipManager {
+
+    private final String groupId;

Review Comment:
   ditto about the javadoc.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Optional;
+
+/**
+ * Membership manager that maintains group membership for a single member 
following the new
+ * consumer group protocol.
+ * <p/>
+ * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
+ * the member receives. It is also responsible for computing assignment for 
the group based on
+ * the metadata, if the member has been selected by the broker to do so.
+ */
+public class MembershipManagerImpl implements MembershipManager {
+
+    private final String groupId;
+    private final Optional<String> groupInstanceId;
+    private String memberId;
+    private int memberEpoch;
+    private MemberState state;
+    private AssignorSelection assignorSelection;
+
+    /**
+     * Assignment that the member received from the server and successfully 
processed.
+     */
+    private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+    /**
+     * Assignment that the member received from the server but hasn't 
completely processed
+     * yet.
+     */
+    private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
targetAssignment;
+    /**
+     * Latest assignment that the member received from the server while a 
{@link #targetAssignment}
+     * was in process.
+     */
+    private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
nextTargetAssignment;
+
+    public MembershipManagerImpl(String groupId) {
+        this(groupId, null, null);
+    }
+
+    public MembershipManagerImpl(String groupId, String groupInstanceId, 
AssignorSelection assignorSelection) {
+        this.groupId = groupId;
+        this.state = MemberState.UNJOINED;
+        if (assignorSelection == null) {
+            setAssignorSelection(AssignorSelection.defaultAssignor());
+        } else {
+            setAssignorSelection(assignorSelection);
+        }
+        this.groupInstanceId = Optional.ofNullable(groupInstanceId);
+        this.targetAssignment = Optional.empty();
+        this.nextTargetAssignment = Optional.empty();
+    }
+
+    /**
+     * Update assignor selection for the member.
+     *
+     * @param assignorSelection New assignor selection
+     * @throws IllegalArgumentException If the provided assignor selection is 
null
+     */
+    public void setAssignorSelection(AssignorSelection assignorSelection) {
+        if (assignorSelection == null) {
+            throw new IllegalArgumentException("Assignor selection cannot be 
null");
+        }
+        this.assignorSelection = assignorSelection;
+    }
+
+    private void transitionTo(MemberState nextState) {
+        if (!this.state.equals(nextState) && 
!nextState.getPreviousValidStates().contains(state)) {
+            // TODO: handle invalid state transition
+            throw new RuntimeException(String.format("Invalid state transition 
from %s to %s",
+                    state, nextState));
+        }
+        this.state = nextState;
+    }
+
+    @Override
+    public String groupId() {
+        return groupId;
+    }
+
+    @Override
+    public Optional<String> groupInstanceId() {
+        return groupInstanceId;
+    }
+
+    @Override
+    public String memberId() {
+        return memberId;
+    }
+
+    @Override
+    public int memberEpoch() {
+        return memberEpoch;
+    }
+
+    @Override
+    public void updateState(ConsumerGroupHeartbeatResponseData response) {
+        if (response.errorCode() == Errors.NONE.code()) {
+            this.memberId = response.memberId();
+            this.memberEpoch = response.memberEpoch();
+            ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
+            if (assignment != null) {
+                setTargetAssignment(assignment);
+            }
+            maybeTransitionToStable();
+        } else {
+            if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || 
response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) {
+                resetEpoch();
+                transitionTo(MemberState.FENCED);
+            } else if (response.errorCode() == 
Errors.UNRELEASED_INSTANCE_ID.code()) {
+                transitionTo(MemberState.FAILED);
+            }
+            // TODO: handle other errors here to update state accordingly, 
mainly making the
+            //  distinction between the recoverable errors and the fatal ones, 
that should FAILED
+            //  the member
+        }
+    }
+
+    /**
+     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
+     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     */
+    private boolean maybeTransitionToStable() {
+        if (!hasPendingTargetAssignment()) {
+            transitionTo(MemberState.STABLE);
+        } else {
+            transitionTo(MemberState.RECONCILING);
+        }
+        return state.equals(MemberState.STABLE);
+    }
+
+    private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
+        if (!targetAssignment.isPresent()) {
+            targetAssignment = Optional.of(newTargetAssignment);
+        } else {
+            // Keep the latest next target assignment
+            nextTargetAssignment = Optional.of(newTargetAssignment);
+        }
+    }
+
+    private boolean hasPendingTargetAssignment() {
+        return targetAssignment.isPresent() || 
nextTargetAssignment.isPresent();
+    }
+

Review Comment:
   nit: Extra empty line.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.Objects;
+
+/**
+ * Selection of a type of assignor used by a member to get partitions assigned 
as part of a
+ * consumer group. Currently supported assignors are:
+ * <li>SERVER assignors</li>
+ * <p/>
+ * Server assignors include a name of the server assignor selected, ex. 
uniform, range.
+ */
+public class AssignorSelection {
+    public enum Type { SERVER }
+
+    private final AssignorSelection.Type type;
+    private String serverAssignor;
+
+    private AssignorSelection(Type type, String serverAssignor) {
+        this.type = type;
+        if (type == Type.SERVER) {
+            this.serverAssignor = serverAssignor;
+        } else {
+            throw new IllegalArgumentException("Unsupported assignor type " + 
type);
+        }
+    }
+
+    public static AssignorSelection newServerAssignor(String serverAssignor) {
+        if (serverAssignor == null) {
+            throw new IllegalArgumentException("Selected server assignor name 
cannot be null");
+        }
+        if (serverAssignor.isEmpty()) {
+            throw new IllegalArgumentException("Selected server assignor name 
cannot be empty");
+        }
+        return new AssignorSelection(Type.SERVER, serverAssignor);
+    }
+
+    public static AssignorSelection defaultAssignor() {
+        // TODO: review default selection
+        return new AssignorSelection(Type.SERVER, "uniform");

Review Comment:
   By default, we should not set any assignor and let the server use the 
preferred one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to