[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation
philipnee commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1332524689 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberAssignmentReconciler.java: ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.RebalanceCallbackEvent; +import org.apache.kafka.clients.consumer.internals.events.RevokePartitionsEvent; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions; +import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * {@code MemberAssignmentReconciler} works with {@link MembershipManager} to a) determine, and b) modify the + * current set of assigned {@link TopicPartition partitions} via {@link SubscriptionState}. Reconciliation is a + * two-part process, the first being a revocation of partitions, followed by assignment of partitions. Each of the two + * steps may result in one of the following: + * + * + * + * {@link ReconciliationResult#NO_CHANGE}: no changes were made to the set of partitions. + * + * + * {@link ReconciliationResult#IN_PROGRESS}: changes to the assignment have started. In practice this means + * that the appropriate {@link ConsumerRebalanceListener} callback method is being invoked. + * + * + * {@link ReconciliationResult#COMPLETED}: the {@link ConsumerRebalanceListener} callback method was made and + * the changes were applied locally. + * + * + * {@link ReconciliationResult#EXPIRED}: something happened to cause the operation to modify the assigned set + * of partitions. This could be caused by a {@link ConsumerRebalanceListener} callback method that takes too + * long to execute, interruption with the consumer group coordinator, or other reasons. + * + * + * + * The comparison against the {@link SubscriptionState#assignedPartitions() current set of assigned partitions} and + * the {@link Assignment#assignedTopicPartitions() target set of assigned partitions} is performed by essentially + * flattening the respective entries into two sets of {@link org.apache.kafka.common.TopicPartition partitons} + * which are then compared using basic {@link Set} comparisons. + */ +public class MemberAssignmentReconciler { + +/** + * The result of the {@link #revoke(Optional, Timer)} or {@link #assign(Optional, Timer)} methods being invoked. + */ +enum ReconciliationResult { +NO_CHANGE, +IN_PROGRESS, +COMPLETED, +EXPIRED +} + +// Ugly little handler enum for making logging less verbose. +private enum Operation { + +REVOKE("revoke", "revoked"), ASSIGN("assign", "assigned"); + +private final String verbPastTense; + +private final String methodName; + +Operation(String verbPresentTense, String verbPastTense) { +this.verbPastTense = verbPastTense; +this.methodName = String.format("%s.onPartitions%s()", ConsumerRebalanceListener.class.getSimpleName(), verbPresentTense.substring(0, 1).toUpperCase(Locale.ROOT) + verbPrese
[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation
philipnee commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1332517141 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -237,4 +253,64 @@ public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig } maybeTransitionToStable(); } + +/** + * Performs the reconciliation between the current topic assignment and the {@link Assignment} provided in the + * heartbeat response. Per the KIP-848 protocol, we perform the following: + * + * + * Revoke partitions, if any + * Heartbeat to acknowledge revoked partitions + * Assign partitions, if any + * Heartbeat to acknowledge assigned partitions + * + * + * TODO: What are the the state changes here? + * TODO: Is SubscriptionState sufficient to build the next heartbeat request? + * TODO: Where do we need to call ConsumerRebalanceListener.onPartitionsLost()? + */ +void reconcile() { +transitionTo(MemberState.RECONCILING); + +Timer remainingAssignmentTime = time.timer(1); Review Comment: You should be getting the rebalance timeout from the max poll interval configuration ``` The rebalance timeout is provided by the member when it joins the group. It is basically the max poll interval configured on the client side ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation
philipnee commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1332517141 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -237,4 +253,64 @@ public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig } maybeTransitionToStable(); } + +/** + * Performs the reconciliation between the current topic assignment and the {@link Assignment} provided in the + * heartbeat response. Per the KIP-848 protocol, we perform the following: + * + * + * Revoke partitions, if any + * Heartbeat to acknowledge revoked partitions + * Assign partitions, if any + * Heartbeat to acknowledge assigned partitions + * + * + * TODO: What are the the state changes here? + * TODO: Is SubscriptionState sufficient to build the next heartbeat request? + * TODO: Where do we need to call ConsumerRebalanceListener.onPartitionsLost()? + */ +void reconcile() { +transitionTo(MemberState.RECONCILING); + +Timer remainingAssignmentTime = time.timer(1); Review Comment: Maybe add a TODO - You should be getting the rebalance timeout from the max poll interval. ``` The rebalance timeout is provided by the member when it joins the group. It is basically the max poll interval configured on the client side ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation
philipnee commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1332517141 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -237,4 +253,64 @@ public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig } maybeTransitionToStable(); } + +/** + * Performs the reconciliation between the current topic assignment and the {@link Assignment} provided in the + * heartbeat response. Per the KIP-848 protocol, we perform the following: + * + * + * Revoke partitions, if any + * Heartbeat to acknowledge revoked partitions + * Assign partitions, if any + * Heartbeat to acknowledge assigned partitions + * + * + * TODO: What are the the state changes here? + * TODO: Is SubscriptionState sufficient to build the next heartbeat request? + * TODO: Where do we need to call ConsumerRebalanceListener.onPartitionsLost()? + */ +void reconcile() { +transitionTo(MemberState.RECONCILING); + +Timer remainingAssignmentTime = time.timer(1); Review Comment: Maybe add a TODO - this should be rebalance timeout from the HB resp I think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation
philipnee commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1332516405 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -237,4 +253,64 @@ public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig } maybeTransitionToStable(); } + +/** + * Performs the reconciliation between the current topic assignment and the {@link Assignment} provided in the + * heartbeat response. Per the KIP-848 protocol, we perform the following: + * + * + * Revoke partitions, if any + * Heartbeat to acknowledge revoked partitions + * Assign partitions, if any + * Heartbeat to acknowledge assigned partitions + * + * + * TODO: What are the the state changes here? + * TODO: Is SubscriptionState sufficient to build the next heartbeat request? + * TODO: Where do we need to call ConsumerRebalanceListener.onPartitionsLost()? + */ +void reconcile() { Review Comment: I wonder if the timer should be updated here. We should be checking the reconciliation results on every poll no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation
philipnee commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1332510414 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; +import org.apache.kafka.common.utils.Timer; + +import java.util.concurrent.CompletableFuture; + +/** + * Background thread event with a result in the form of a future, that can be retrieved within a + * timeout based on completion. + * + * @param + */ +public abstract class CompletableBackgroundEvent extends BackgroundEvent { + +private final CompletableFuture future; + +protected CompletableBackgroundEvent(Type type) { +super(type); +this.future = new CompletableFuture<>(); +} + +public CompletableFuture future() { +return future; +} + +public T get(Timer timer) { +return ConsumerUtils.getResult(future, timer); +} + +public void chain(final CompletableFuture providedFuture) { +providedFuture.whenComplete((value, exception) -> { +if (exception != null) { +this.future.completeExceptionally(exception); +} else { +this.future.complete(value); +} +}); +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; +if (!super.equals(o)) return false; + +CompletableBackgroundEvent that = (CompletableBackgroundEvent) o; + +return future.equals(that.future); +} + +@Override +public int hashCode() { +int result = super.hashCode(); +result = 31 * result + future.hashCode(); +return result; +} + +@Override +protected String toStringBase() { +return super.toStringBase() + ", future=" + future; +} + +@Override +public String toString() { +return getClass().getSimpleName() + "{" + Review Comment: we should use ( ) I think @dajac pointed out one day. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation
philipnee commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1332509183 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -237,4 +253,64 @@ public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig } maybeTransitionToStable(); } + +/** + * Performs the reconciliation between the current topic assignment and the {@link Assignment} provided in the + * heartbeat response. Per the KIP-848 protocol, we perform the following: + * + * + * Revoke partitions, if any + * Heartbeat to acknowledge revoked partitions + * Assign partitions, if any + * Heartbeat to acknowledge assigned partitions + * + * + * TODO: What are the the state changes here? + * TODO: Is SubscriptionState sufficient to build the next heartbeat request? + * TODO: Where do we need to call ConsumerRebalanceListener.onPartitionsLost()? + */ +void reconcile() { +transitionTo(MemberState.RECONCILING); + +Timer remainingAssignmentTime = time.timer(1); + +// First, we need to determine if any partitions need to be revoked. +{ +ReconciliationResult result = reconciler.revoke(targetAssignment, remainingAssignmentTime); +remainingAssignmentTime.update(); + +if (result == ReconciliationResult.COMPLETED) { +// If we've revoked one or more partitions, we need to send an acknowledgement request ASAP to Review Comment: I'm having a bit of dilemma of how to correct handling the heartbeat timer reset here - now, we could have this and the heartbeat request manager to hold on to each other, and invokes "onAssignmentRevocation" to expire the heartbeat timer and ensure the next poll will have the heartbeat sent out. Another way to do it, which could be better, is to have the heartbeat check on a state here to determine if a heartbeat needs to be sent out immediately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation
philipnee commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1332505218 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -237,4 +253,64 @@ public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig } maybeTransitionToStable(); } + +/** + * Performs the reconciliation between the current topic assignment and the {@link Assignment} provided in the + * heartbeat response. Per the KIP-848 protocol, we perform the following: + * + * + * Revoke partitions, if any + * Heartbeat to acknowledge revoked partitions + * Assign partitions, if any + * Heartbeat to acknowledge assigned partitions + * + * + * TODO: What are the the state changes here? + * TODO: Is SubscriptionState sufficient to build the next heartbeat request? + * TODO: Where do we need to call ConsumerRebalanceListener.onPartitionsLost()? + */ +void reconcile() { +transitionTo(MemberState.RECONCILING); + +Timer remainingAssignmentTime = time.timer(1); + +// First, we need to determine if any partitions need to be revoked. +{ +ReconciliationResult result = reconciler.revoke(targetAssignment, remainingAssignmentTime); +remainingAssignmentTime.update(); + +if (result == ReconciliationResult.COMPLETED) { +// If we've revoked one or more partitions, we need to send an acknowledgement request ASAP to +// let the coordinator know that they've been removed locally. +return; +} else if (result == ReconciliationResult.EXPIRED) { +// TODO: what do we do here? Review Comment: This is the description from kip-848 ``` The chosen member is expected to complete the assignment process within the rebalance timeout. The time on the coordinator side starts ticking when the member is notified. If the process is not completed within the rebalance timeout, the group coordinator picks up another member to run the assignment. Note that the previous chosen member is not fenced here because the fencing is only done based on the session. ``` It seems that you should send a heartbeat to the GC and get a fenced exception, which results in revoking all partitions. @dajac can provide a more accurate answer here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation
philipnee commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1332495824 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberAssignmentReconciler.java: ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.RebalanceCallbackEvent; +import org.apache.kafka.clients.consumer.internals.events.RevokePartitionsEvent; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions; +import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * {@code MemberAssignmentReconciler} works with {@link MembershipManager} to a) determine, and b) modify the + * current set of assigned {@link TopicPartition partitions} via {@link SubscriptionState}. Reconciliation is a + * two-part process, the first being a revocation of partitions, followed by assignment of partitions. Each of the two + * steps may result in one of the following: + * + * + * + * {@link ReconciliationResult#NO_CHANGE}: no changes were made to the set of partitions. + * + * + * {@link ReconciliationResult#IN_PROGRESS}: changes to the assignment have started. In practice this means + * that the appropriate {@link ConsumerRebalanceListener} callback method is being invoked. + * + * + * {@link ReconciliationResult#COMPLETED}: the {@link ConsumerRebalanceListener} callback method was made and + * the changes were applied locally. + * + * + * {@link ReconciliationResult#EXPIRED}: something happened to cause the operation to modify the assigned set + * of partitions. This could be caused by a {@link ConsumerRebalanceListener} callback method that takes too + * long to execute, interruption with the consumer group coordinator, or other reasons. + * + * + * + * The comparison against the {@link SubscriptionState#assignedPartitions() current set of assigned partitions} and + * the {@link Assignment#assignedTopicPartitions() target set of assigned partitions} is performed by essentially + * flattening the respective entries into two sets of {@link org.apache.kafka.common.TopicPartition partitons} + * which are then compared using basic {@link Set} comparisons. + */ +public class MemberAssignmentReconciler { Review Comment: One missing piece would be the heartbeat - the heartbeat needs to be sent out upon revoking partitions. I think we can leave a note here or implement this in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation
philipnee commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1332491405 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberAssignmentReconciler.java: ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.RebalanceCallbackEvent; +import org.apache.kafka.clients.consumer.internals.events.RevokePartitionsEvent; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions; +import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * {@code MemberAssignmentReconciler} works with {@link MembershipManager} to a) determine, and b) modify the + * current set of assigned {@link TopicPartition partitions} via {@link SubscriptionState}. Reconciliation is a + * two-part process, the first being a revocation of partitions, followed by assignment of partitions. Each of the two + * steps may result in one of the following: + * + * + * + * {@link ReconciliationResult#NO_CHANGE}: no changes were made to the set of partitions. + * + * + * {@link ReconciliationResult#IN_PROGRESS}: changes to the assignment have started. In practice this means + * that the appropriate {@link ConsumerRebalanceListener} callback method is being invoked. + * + * + * {@link ReconciliationResult#COMPLETED}: the {@link ConsumerRebalanceListener} callback method was made and + * the changes were applied locally. + * + * + * {@link ReconciliationResult#EXPIRED}: something happened to cause the operation to modify the assigned set + * of partitions. This could be caused by a {@link ConsumerRebalanceListener} callback method that takes too + * long to execute, interruption with the consumer group coordinator, or other reasons. + * + * + * + * The comparison against the {@link SubscriptionState#assignedPartitions() current set of assigned partitions} and + * the {@link Assignment#assignedTopicPartitions() target set of assigned partitions} is performed by essentially + * flattening the respective entries into two sets of {@link org.apache.kafka.common.TopicPartition partitons} + * which are then compared using basic {@link Set} comparisons. + */ +public class MemberAssignmentReconciler { + +/** + * The result of the {@link #revoke(Optional, Timer)} or {@link #assign(Optional, Timer)} methods being invoked. + */ +enum ReconciliationResult { +NO_CHANGE, +IN_PROGRESS, +COMPLETED, +EXPIRED +} + +// Ugly little handler enum for making logging less verbose. +private enum Operation { + +REVOKE("revoke", "revoked"), ASSIGN("assign", "assigned"); + +private final String verbPastTense; + +private final String methodName; + +Operation(String verbPresentTense, String verbPastTense) { +this.verbPastTense = verbPastTense; +this.methodName = String.format("%s.onPartitions%s()", ConsumerRebalanceListener.class.getSimpleName(), verbPresentTense.substring(0, 1).toUpperCase(Locale.ROOT) + verbPrese
[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation
philipnee commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1332491405 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberAssignmentReconciler.java: ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.RebalanceCallbackEvent; +import org.apache.kafka.clients.consumer.internals.events.RevokePartitionsEvent; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions; +import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * {@code MemberAssignmentReconciler} works with {@link MembershipManager} to a) determine, and b) modify the + * current set of assigned {@link TopicPartition partitions} via {@link SubscriptionState}. Reconciliation is a + * two-part process, the first being a revocation of partitions, followed by assignment of partitions. Each of the two + * steps may result in one of the following: + * + * + * + * {@link ReconciliationResult#NO_CHANGE}: no changes were made to the set of partitions. + * + * + * {@link ReconciliationResult#IN_PROGRESS}: changes to the assignment have started. In practice this means + * that the appropriate {@link ConsumerRebalanceListener} callback method is being invoked. + * + * + * {@link ReconciliationResult#COMPLETED}: the {@link ConsumerRebalanceListener} callback method was made and + * the changes were applied locally. + * + * + * {@link ReconciliationResult#EXPIRED}: something happened to cause the operation to modify the assigned set + * of partitions. This could be caused by a {@link ConsumerRebalanceListener} callback method that takes too + * long to execute, interruption with the consumer group coordinator, or other reasons. + * + * + * + * The comparison against the {@link SubscriptionState#assignedPartitions() current set of assigned partitions} and + * the {@link Assignment#assignedTopicPartitions() target set of assigned partitions} is performed by essentially + * flattening the respective entries into two sets of {@link org.apache.kafka.common.TopicPartition partitons} + * which are then compared using basic {@link Set} comparisons. + */ +public class MemberAssignmentReconciler { + +/** + * The result of the {@link #revoke(Optional, Timer)} or {@link #assign(Optional, Timer)} methods being invoked. + */ +enum ReconciliationResult { +NO_CHANGE, +IN_PROGRESS, +COMPLETED, +EXPIRED +} + +// Ugly little handler enum for making logging less verbose. +private enum Operation { + +REVOKE("revoke", "revoked"), ASSIGN("assign", "assigned"); + +private final String verbPastTense; + +private final String methodName; + +Operation(String verbPresentTense, String verbPastTense) { +this.verbPastTense = verbPastTense; +this.methodName = String.format("%s.onPartitions%s()", ConsumerRebalanceListener.class.getSimpleName(), verbPresentTense.substring(0, 1).toUpperCase(Locale.ROOT) + verbPrese
[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation
philipnee commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1330957545 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberAssignmentReconciler.java: ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.RebalanceCallbackEvent; +import org.apache.kafka.clients.consumer.internals.events.RevokePartitionsEvent; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions; +import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * {@code MemberAssignmentReconciler} works with {@link MembershipManager} to a) determine, and b) modify the + * current set of assigned {@link TopicPartition partitions} via {@link SubscriptionState}. Reconciliation is a + * two-part process, the first being a revocation of partitions, followed by assignment of partitions. Each of the two + * steps may result in one of the following: + * + * + * + * {@link ReconciliationResult#NO_CHANGE}: no changes were made to the set of partitions. + * + * + * {@link ReconciliationResult#IN_PROGRESS}: changes to the assignment have started. In practice this means + * that the appropriate {@link ConsumerRebalanceListener} callback method is being invoked. + * + * + * {@link ReconciliationResult#COMPLETED}: the {@link ConsumerRebalanceListener} callback method was made and + * the changes were applied locally. + * + * + * {@link ReconciliationResult#EXPIRED}: something happened to cause the operation to modify the assigned set + * of partitions. This could be caused by a {@link ConsumerRebalanceListener} callback method that takes too + * long to execute, interruption with the consumer group coordinator, or other reasons. + * + * + * + * The comparison against the {@link SubscriptionState#assignedPartitions() current set of assigned partitions} and + * the {@link Assignment#assignedTopicPartitions() target set of assigned partitions} is performed by essentially + * flattening the respective entries into two sets of {@link org.apache.kafka.common.TopicPartition partitons} + * which are then compared using basic {@link Set} comparisons. + */ +public class MemberAssignmentReconciler { + +/** + * The result of the {@link #revoke(Optional, Timer)} or {@link #assign(Optional, Timer)} methods being invoked. + */ +enum ReconciliationResult { +NO_CHANGE, +IN_PROGRESS, +COMPLETED, +EXPIRED +} + +// Ugly little handler enum for making logging less verbose. +private enum Operation { + +REVOKE("revoke", "revoked"), ASSIGN("assign", "assigned"); + +private final String verbPastTense; + +private final String methodName; + +Operation(String verbPresentTense, String verbPastTense) { +this.verbPastTense = verbPastTense; +this.methodName = String.format("%s.onPartitions%s()", ConsumerRebalanceListener.class.getSimpleName(), verbPresentTense.substring(0, 1).toUpperCase(Locale.ROOT) + verbPrese