[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1332524689


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberAssignmentReconciler.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RebalanceCallbackEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RevokePartitionsEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions;
+import 
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * {@code MemberAssignmentReconciler} works with {@link MembershipManager} to 
a) determine, and b) modify the
+ * current set of assigned {@link TopicPartition partitions} via {@link 
SubscriptionState}. Reconciliation is a
+ * two-part process, the first being a revocation of partitions, followed by 
assignment of partitions. Each of the two
+ * steps may result in one of the following:
+ *
+ * 
+ * 
+ * {@link ReconciliationResult#NO_CHANGE}: no changes were made to the 
set of partitions.
+ * 
+ * 
+ * {@link ReconciliationResult#IN_PROGRESS}: changes to the assignment 
have started. In practice this means
+ * that the appropriate {@link ConsumerRebalanceListener} callback 
method is being invoked.
+ * 
+ * 
+ * {@link ReconciliationResult#COMPLETED}: the {@link 
ConsumerRebalanceListener} callback method was made and
+ * the changes were applied locally.
+ * 
+ * 
+ * {@link ReconciliationResult#EXPIRED}: something happened to cause 
the operation to modify the assigned set
+ * of partitions. This could be caused by a {@link 
ConsumerRebalanceListener} callback method that takes too
+ * long to execute, interruption with the consumer group coordinator, 
or other reasons.
+ * 
+ * 
+ *
+ * The comparison against the {@link SubscriptionState#assignedPartitions() 
current set of assigned partitions} and
+ * the {@link Assignment#assignedTopicPartitions() target set of assigned 
partitions} is performed by essentially
+ * flattening the respective entries into two sets of {@link 
org.apache.kafka.common.TopicPartition partitons}
+ * which are then compared using basic {@link Set} comparisons.
+ */
+public class MemberAssignmentReconciler {
+
+/**
+ * The result of the {@link #revoke(Optional, Timer)} or {@link 
#assign(Optional, Timer)} methods being invoked.
+ */
+enum ReconciliationResult {
+NO_CHANGE,
+IN_PROGRESS,
+COMPLETED,
+EXPIRED
+}
+
+// Ugly little handler enum for making logging less verbose.
+private enum Operation {
+
+REVOKE("revoke", "revoked"), ASSIGN("assign", "assigned");
+
+private final String verbPastTense;
+
+private final String methodName;
+
+Operation(String verbPresentTense, String verbPastTense) {
+this.verbPastTense = verbPastTense;
+this.methodName = String.format("%s.onPartitions%s()", 
ConsumerRebalanceListener.class.getSimpleName(), verbPresentTense.substring(0, 
1).toUpperCase(Locale.ROOT) + 
verbPrese

[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1332517141


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -237,4 +253,64 @@ public void 
updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig
 }
 maybeTransitionToStable();
 }
+
+/**
+ * Performs the reconciliation between the current topic assignment and 
the {@link Assignment} provided in the
+ * heartbeat response. Per the KIP-848 protocol, we perform the following:
+ *
+ * 
+ * Revoke partitions, if any
+ * Heartbeat to acknowledge revoked partitions
+ * Assign partitions, if any
+ * Heartbeat to acknowledge assigned partitions
+ * 
+ *
+ * TODO: What are the the state changes here?
+ * TODO: Is SubscriptionState sufficient to build the next heartbeat 
request?
+ * TODO: Where do we need to call 
ConsumerRebalanceListener.onPartitionsLost()?
+ */
+void reconcile() {
+transitionTo(MemberState.RECONCILING);
+
+Timer remainingAssignmentTime = time.timer(1);

Review Comment:
   You should be getting the rebalance timeout from the max poll interval 
configuration
   
   ```
   The rebalance timeout is provided by the member when it joins the group. It 
is basically the max poll interval configured on the client side
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1332517141


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -237,4 +253,64 @@ public void 
updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig
 }
 maybeTransitionToStable();
 }
+
+/**
+ * Performs the reconciliation between the current topic assignment and 
the {@link Assignment} provided in the
+ * heartbeat response. Per the KIP-848 protocol, we perform the following:
+ *
+ * 
+ * Revoke partitions, if any
+ * Heartbeat to acknowledge revoked partitions
+ * Assign partitions, if any
+ * Heartbeat to acknowledge assigned partitions
+ * 
+ *
+ * TODO: What are the the state changes here?
+ * TODO: Is SubscriptionState sufficient to build the next heartbeat 
request?
+ * TODO: Where do we need to call 
ConsumerRebalanceListener.onPartitionsLost()?
+ */
+void reconcile() {
+transitionTo(MemberState.RECONCILING);
+
+Timer remainingAssignmentTime = time.timer(1);

Review Comment:
   Maybe add a TODO - You should be getting the rebalance timeout from the max 
poll interval.
   
   ```
   The rebalance timeout is provided by the member when it joins the group. It 
is basically the max poll interval configured on the client side
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1332517141


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -237,4 +253,64 @@ public void 
updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig
 }
 maybeTransitionToStable();
 }
+
+/**
+ * Performs the reconciliation between the current topic assignment and 
the {@link Assignment} provided in the
+ * heartbeat response. Per the KIP-848 protocol, we perform the following:
+ *
+ * 
+ * Revoke partitions, if any
+ * Heartbeat to acknowledge revoked partitions
+ * Assign partitions, if any
+ * Heartbeat to acknowledge assigned partitions
+ * 
+ *
+ * TODO: What are the the state changes here?
+ * TODO: Is SubscriptionState sufficient to build the next heartbeat 
request?
+ * TODO: Where do we need to call 
ConsumerRebalanceListener.onPartitionsLost()?
+ */
+void reconcile() {
+transitionTo(MemberState.RECONCILING);
+
+Timer remainingAssignmentTime = time.timer(1);

Review Comment:
   Maybe add a TODO - this should be rebalance timeout from the HB resp I think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1332516405


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -237,4 +253,64 @@ public void 
updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig
 }
 maybeTransitionToStable();
 }
+
+/**
+ * Performs the reconciliation between the current topic assignment and 
the {@link Assignment} provided in the
+ * heartbeat response. Per the KIP-848 protocol, we perform the following:
+ *
+ * 
+ * Revoke partitions, if any
+ * Heartbeat to acknowledge revoked partitions
+ * Assign partitions, if any
+ * Heartbeat to acknowledge assigned partitions
+ * 
+ *
+ * TODO: What are the the state changes here?
+ * TODO: Is SubscriptionState sufficient to build the next heartbeat 
request?
+ * TODO: Where do we need to call 
ConsumerRebalanceListener.onPartitionsLost()?
+ */
+void reconcile() {

Review Comment:
   I wonder if the timer should be updated here. We should be checking the 
reconciliation results on every poll no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1332510414


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+import org.apache.kafka.common.utils.Timer;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Background thread event with a result in the form of a future, that can be 
retrieved within a
+ * timeout based on completion.
+ *
+ * @param 
+ */
+public abstract class CompletableBackgroundEvent extends BackgroundEvent {
+
+private final CompletableFuture future;
+
+protected CompletableBackgroundEvent(Type type) {
+super(type);
+this.future = new CompletableFuture<>();
+}
+
+public CompletableFuture future() {
+return future;
+}
+
+public T get(Timer timer) {
+return ConsumerUtils.getResult(future, timer);
+}
+
+public void chain(final CompletableFuture providedFuture) {
+providedFuture.whenComplete((value, exception) -> {
+if (exception != null) {
+this.future.completeExceptionally(exception);
+} else {
+this.future.complete(value);
+}
+});
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+if (!super.equals(o)) return false;
+
+CompletableBackgroundEvent that = (CompletableBackgroundEvent) o;
+
+return future.equals(that.future);
+}
+
+@Override
+public int hashCode() {
+int result = super.hashCode();
+result = 31 * result + future.hashCode();
+return result;
+}
+
+@Override
+protected String toStringBase() {
+return super.toStringBase() + ", future=" + future;
+}
+
+@Override
+public String toString() {
+return getClass().getSimpleName() + "{" +

Review Comment:
   we should use ( ) I think @dajac pointed out one day.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1332509183


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -237,4 +253,64 @@ public void 
updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig
 }
 maybeTransitionToStable();
 }
+
+/**
+ * Performs the reconciliation between the current topic assignment and 
the {@link Assignment} provided in the
+ * heartbeat response. Per the KIP-848 protocol, we perform the following:
+ *
+ * 
+ * Revoke partitions, if any
+ * Heartbeat to acknowledge revoked partitions
+ * Assign partitions, if any
+ * Heartbeat to acknowledge assigned partitions
+ * 
+ *
+ * TODO: What are the the state changes here?
+ * TODO: Is SubscriptionState sufficient to build the next heartbeat 
request?
+ * TODO: Where do we need to call 
ConsumerRebalanceListener.onPartitionsLost()?
+ */
+void reconcile() {
+transitionTo(MemberState.RECONCILING);
+
+Timer remainingAssignmentTime = time.timer(1);
+
+// First, we need to determine if any partitions need to be revoked.
+{
+ReconciliationResult result = reconciler.revoke(targetAssignment, 
remainingAssignmentTime);
+remainingAssignmentTime.update();
+
+if (result == ReconciliationResult.COMPLETED) {
+// If we've revoked one or more partitions, we need to send an 
acknowledgement request ASAP to

Review Comment:
   I'm having a bit of dilemma of how to correct handling the heartbeat timer 
reset here - now, we could have this and the heartbeat request manager to hold 
on to each other, and invokes "onAssignmentRevocation" to expire the heartbeat 
timer and ensure the next poll will have the heartbeat sent out.  
   
   Another way to do it, which could be better, is to have the heartbeat check 
on a state here to determine if a heartbeat needs to be sent out immediately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1332505218


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -237,4 +253,64 @@ public void 
updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig
 }
 maybeTransitionToStable();
 }
+
+/**
+ * Performs the reconciliation between the current topic assignment and 
the {@link Assignment} provided in the
+ * heartbeat response. Per the KIP-848 protocol, we perform the following:
+ *
+ * 
+ * Revoke partitions, if any
+ * Heartbeat to acknowledge revoked partitions
+ * Assign partitions, if any
+ * Heartbeat to acknowledge assigned partitions
+ * 
+ *
+ * TODO: What are the the state changes here?
+ * TODO: Is SubscriptionState sufficient to build the next heartbeat 
request?
+ * TODO: Where do we need to call 
ConsumerRebalanceListener.onPartitionsLost()?
+ */
+void reconcile() {
+transitionTo(MemberState.RECONCILING);
+
+Timer remainingAssignmentTime = time.timer(1);
+
+// First, we need to determine if any partitions need to be revoked.
+{
+ReconciliationResult result = reconciler.revoke(targetAssignment, 
remainingAssignmentTime);
+remainingAssignmentTime.update();
+
+if (result == ReconciliationResult.COMPLETED) {
+// If we've revoked one or more partitions, we need to send an 
acknowledgement request ASAP to
+// let the coordinator know that they've been removed locally.
+return;
+} else if (result == ReconciliationResult.EXPIRED) {
+// TODO: what do we do here?

Review Comment:
   This is the description from kip-848 
   ```
   The chosen member is expected to complete the assignment process within the 
rebalance timeout. The time on the coordinator side starts ticking when the 
member is notified. If the process is not completed within the rebalance 
timeout, the group coordinator picks up another member to run the assignment. 
Note that the previous chosen member is not fenced here because the fencing is 
only done based on the session.
   ```
   
   It seems that you should send a heartbeat to the GC and get a fenced 
exception, which results in revoking all partitions.
   
   @dajac can provide a more accurate answer here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1332495824


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberAssignmentReconciler.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RebalanceCallbackEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RevokePartitionsEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions;
+import 
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * {@code MemberAssignmentReconciler} works with {@link MembershipManager} to 
a) determine, and b) modify the
+ * current set of assigned {@link TopicPartition partitions} via {@link 
SubscriptionState}. Reconciliation is a
+ * two-part process, the first being a revocation of partitions, followed by 
assignment of partitions. Each of the two
+ * steps may result in one of the following:
+ *
+ * 
+ * 
+ * {@link ReconciliationResult#NO_CHANGE}: no changes were made to the 
set of partitions.
+ * 
+ * 
+ * {@link ReconciliationResult#IN_PROGRESS}: changes to the assignment 
have started. In practice this means
+ * that the appropriate {@link ConsumerRebalanceListener} callback 
method is being invoked.
+ * 
+ * 
+ * {@link ReconciliationResult#COMPLETED}: the {@link 
ConsumerRebalanceListener} callback method was made and
+ * the changes were applied locally.
+ * 
+ * 
+ * {@link ReconciliationResult#EXPIRED}: something happened to cause 
the operation to modify the assigned set
+ * of partitions. This could be caused by a {@link 
ConsumerRebalanceListener} callback method that takes too
+ * long to execute, interruption with the consumer group coordinator, 
or other reasons.
+ * 
+ * 
+ *
+ * The comparison against the {@link SubscriptionState#assignedPartitions() 
current set of assigned partitions} and
+ * the {@link Assignment#assignedTopicPartitions() target set of assigned 
partitions} is performed by essentially
+ * flattening the respective entries into two sets of {@link 
org.apache.kafka.common.TopicPartition partitons}
+ * which are then compared using basic {@link Set} comparisons.
+ */
+public class MemberAssignmentReconciler {

Review Comment:
   One missing piece would be the heartbeat - the heartbeat needs to be sent 
out upon revoking partitions. I think we can leave a note here or implement 
this in this PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1332491405


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberAssignmentReconciler.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RebalanceCallbackEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RevokePartitionsEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions;
+import 
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * {@code MemberAssignmentReconciler} works with {@link MembershipManager} to 
a) determine, and b) modify the
+ * current set of assigned {@link TopicPartition partitions} via {@link 
SubscriptionState}. Reconciliation is a
+ * two-part process, the first being a revocation of partitions, followed by 
assignment of partitions. Each of the two
+ * steps may result in one of the following:
+ *
+ * 
+ * 
+ * {@link ReconciliationResult#NO_CHANGE}: no changes were made to the 
set of partitions.
+ * 
+ * 
+ * {@link ReconciliationResult#IN_PROGRESS}: changes to the assignment 
have started. In practice this means
+ * that the appropriate {@link ConsumerRebalanceListener} callback 
method is being invoked.
+ * 
+ * 
+ * {@link ReconciliationResult#COMPLETED}: the {@link 
ConsumerRebalanceListener} callback method was made and
+ * the changes were applied locally.
+ * 
+ * 
+ * {@link ReconciliationResult#EXPIRED}: something happened to cause 
the operation to modify the assigned set
+ * of partitions. This could be caused by a {@link 
ConsumerRebalanceListener} callback method that takes too
+ * long to execute, interruption with the consumer group coordinator, 
or other reasons.
+ * 
+ * 
+ *
+ * The comparison against the {@link SubscriptionState#assignedPartitions() 
current set of assigned partitions} and
+ * the {@link Assignment#assignedTopicPartitions() target set of assigned 
partitions} is performed by essentially
+ * flattening the respective entries into two sets of {@link 
org.apache.kafka.common.TopicPartition partitons}
+ * which are then compared using basic {@link Set} comparisons.
+ */
+public class MemberAssignmentReconciler {
+
+/**
+ * The result of the {@link #revoke(Optional, Timer)} or {@link 
#assign(Optional, Timer)} methods being invoked.
+ */
+enum ReconciliationResult {
+NO_CHANGE,
+IN_PROGRESS,
+COMPLETED,
+EXPIRED
+}
+
+// Ugly little handler enum for making logging less verbose.
+private enum Operation {
+
+REVOKE("revoke", "revoked"), ASSIGN("assign", "assigned");
+
+private final String verbPastTense;
+
+private final String methodName;
+
+Operation(String verbPresentTense, String verbPastTense) {
+this.verbPastTense = verbPastTense;
+this.methodName = String.format("%s.onPartitions%s()", 
ConsumerRebalanceListener.class.getSimpleName(), verbPresentTense.substring(0, 
1).toUpperCase(Locale.ROOT) + 
verbPrese

[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation

2023-09-20 Thread via GitHub


philipnee commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1332491405


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberAssignmentReconciler.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RebalanceCallbackEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RevokePartitionsEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions;
+import 
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * {@code MemberAssignmentReconciler} works with {@link MembershipManager} to 
a) determine, and b) modify the
+ * current set of assigned {@link TopicPartition partitions} via {@link 
SubscriptionState}. Reconciliation is a
+ * two-part process, the first being a revocation of partitions, followed by 
assignment of partitions. Each of the two
+ * steps may result in one of the following:
+ *
+ * 
+ * 
+ * {@link ReconciliationResult#NO_CHANGE}: no changes were made to the 
set of partitions.
+ * 
+ * 
+ * {@link ReconciliationResult#IN_PROGRESS}: changes to the assignment 
have started. In practice this means
+ * that the appropriate {@link ConsumerRebalanceListener} callback 
method is being invoked.
+ * 
+ * 
+ * {@link ReconciliationResult#COMPLETED}: the {@link 
ConsumerRebalanceListener} callback method was made and
+ * the changes were applied locally.
+ * 
+ * 
+ * {@link ReconciliationResult#EXPIRED}: something happened to cause 
the operation to modify the assigned set
+ * of partitions. This could be caused by a {@link 
ConsumerRebalanceListener} callback method that takes too
+ * long to execute, interruption with the consumer group coordinator, 
or other reasons.
+ * 
+ * 
+ *
+ * The comparison against the {@link SubscriptionState#assignedPartitions() 
current set of assigned partitions} and
+ * the {@link Assignment#assignedTopicPartitions() target set of assigned 
partitions} is performed by essentially
+ * flattening the respective entries into two sets of {@link 
org.apache.kafka.common.TopicPartition partitons}
+ * which are then compared using basic {@link Set} comparisons.
+ */
+public class MemberAssignmentReconciler {
+
+/**
+ * The result of the {@link #revoke(Optional, Timer)} or {@link 
#assign(Optional, Timer)} methods being invoked.
+ */
+enum ReconciliationResult {
+NO_CHANGE,
+IN_PROGRESS,
+COMPLETED,
+EXPIRED
+}
+
+// Ugly little handler enum for making logging less verbose.
+private enum Operation {
+
+REVOKE("revoke", "revoked"), ASSIGN("assign", "assigned");
+
+private final String verbPastTense;
+
+private final String methodName;
+
+Operation(String verbPresentTense, String verbPastTense) {
+this.verbPastTense = verbPastTense;
+this.methodName = String.format("%s.onPartitions%s()", 
ConsumerRebalanceListener.class.getSimpleName(), verbPresentTense.substring(0, 
1).toUpperCase(Locale.ROOT) + 
verbPrese

[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation

2023-09-19 Thread via GitHub


philipnee commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1330957545


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberAssignmentReconciler.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RebalanceCallbackEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RevokePartitionsEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions;
+import 
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * {@code MemberAssignmentReconciler} works with {@link MembershipManager} to 
a) determine, and b) modify the
+ * current set of assigned {@link TopicPartition partitions} via {@link 
SubscriptionState}. Reconciliation is a
+ * two-part process, the first being a revocation of partitions, followed by 
assignment of partitions. Each of the two
+ * steps may result in one of the following:
+ *
+ * 
+ * 
+ * {@link ReconciliationResult#NO_CHANGE}: no changes were made to the 
set of partitions.
+ * 
+ * 
+ * {@link ReconciliationResult#IN_PROGRESS}: changes to the assignment 
have started. In practice this means
+ * that the appropriate {@link ConsumerRebalanceListener} callback 
method is being invoked.
+ * 
+ * 
+ * {@link ReconciliationResult#COMPLETED}: the {@link 
ConsumerRebalanceListener} callback method was made and
+ * the changes were applied locally.
+ * 
+ * 
+ * {@link ReconciliationResult#EXPIRED}: something happened to cause 
the operation to modify the assigned set
+ * of partitions. This could be caused by a {@link 
ConsumerRebalanceListener} callback method that takes too
+ * long to execute, interruption with the consumer group coordinator, 
or other reasons.
+ * 
+ * 
+ *
+ * The comparison against the {@link SubscriptionState#assignedPartitions() 
current set of assigned partitions} and
+ * the {@link Assignment#assignedTopicPartitions() target set of assigned 
partitions} is performed by essentially
+ * flattening the respective entries into two sets of {@link 
org.apache.kafka.common.TopicPartition partitons}
+ * which are then compared using basic {@link Set} comparisons.
+ */
+public class MemberAssignmentReconciler {
+
+/**
+ * The result of the {@link #revoke(Optional, Timer)} or {@link 
#assign(Optional, Timer)} methods being invoked.
+ */
+enum ReconciliationResult {
+NO_CHANGE,
+IN_PROGRESS,
+COMPLETED,
+EXPIRED
+}
+
+// Ugly little handler enum for making logging less verbose.
+private enum Operation {
+
+REVOKE("revoke", "revoked"), ASSIGN("assign", "assigned");
+
+private final String verbPastTense;
+
+private final String methodName;
+
+Operation(String verbPresentTense, String verbPastTense) {
+this.verbPastTense = verbPastTense;
+this.methodName = String.format("%s.onPartitions%s()", 
ConsumerRebalanceListener.class.getSimpleName(), verbPresentTense.substring(0, 
1).toUpperCase(Locale.ROOT) + 
verbPrese