Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru merged PR #15511: URL: https://github.com/apache/kafka/pull/15511 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on PR #15511: URL: https://github.com/apache/kafka/pull/15511#issuecomment-2000323516 Went over the latests updates, LGTM. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1526716168 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -565,6 +565,58 @@ public void testSameAssignmentReconciledAgainWhenFenced() { assertEquals(toTopicIdPartitionMap(assignment1), membershipManager.currentAssignment().partitions); } +/** + * This is the case where we receive a new assignment while reconciling an existing one. The intermediate assignment + * is not applied, and a new assignment containing the same partitions is received and reconciled. In all assignments, + * one topic is not resolvable. + * + * We need to make sure that the last assignment is acked and applied, even though the set of partitions does not change. + * In this case, no rebalance listeners are run. + */ +@Test +public void testSameAssignmentReconciledAgainWithMissingTopic() { +MembershipManagerImpl membershipManager = createMemberInStableState(); +Uuid topic1 = Uuid.randomUuid(); +Uuid topic2 = Uuid.randomUuid(); +final Assignment assignment1 = new ConsumerGroupHeartbeatResponseData.Assignment() +.setTopicPartitions(Arrays.asList( +new TopicPartitions().setTopicId(topic1).setPartitions(Collections.singletonList(0)), +new TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0)) +)); +final Assignment assignment2 = new ConsumerGroupHeartbeatResponseData.Assignment() +.setTopicPartitions(Arrays.asList( +new TopicPartitions().setTopicId(topic1).setPartitions(Arrays.asList(0, 1)), +new TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0)) +)); + when(metadata.topicNames()).thenReturn(Collections.singletonMap(topic1, "topic1")); + +// Receive assignment - full reconciliation triggered +// stay in RECONCILING state, since an unresolved topic is assigned + membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1).data()); +assertEquals(MemberState.RECONCILING, membershipManager.state()); +membershipManager.poll(time.milliseconds()); +verifyReconciliationTriggeredAndCompleted(membershipManager, +Collections.singletonList(new TopicIdPartition(topic1, new TopicPartition("topic1", 0))) +); +membershipManager.onHeartbeatRequestSent(); +assertEquals(MemberState.RECONCILING, membershipManager.state()); +clearInvocations(membershipManager); + +// Receive extended assignment - assignment received but no reconciliation triggered + membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2).data()); +assertEquals(MemberState.RECONCILING, membershipManager.state()); +verifyReconciliationNotTriggered(membershipManager); Review Comment: Exactly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1526715547 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -565,6 +565,58 @@ public void testSameAssignmentReconciledAgainWhenFenced() { assertEquals(toTopicIdPartitionMap(assignment1), membershipManager.currentAssignment().partitions); } +/** + * This is the case where we receive a new assignment while reconciling an existing one. The intermediate assignment + * is not applied, and a new assignment containing the same partitions is received and reconciled. In all assignments, + * one topic is not resolvable. + * + * We need to make sure that the last assignment is acked and applied, even though the set of partitions does not change. + * In this case, no rebalance listeners are run. + */ +@Test +public void testSameAssignmentReconciledAgainWithMissingTopic() { +MembershipManagerImpl membershipManager = createMemberInStableState(); +Uuid topic1 = Uuid.randomUuid(); +Uuid topic2 = Uuid.randomUuid(); +final Assignment assignment1 = new ConsumerGroupHeartbeatResponseData.Assignment() +.setTopicPartitions(Arrays.asList( +new TopicPartitions().setTopicId(topic1).setPartitions(Collections.singletonList(0)), +new TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0)) +)); +final Assignment assignment2 = new ConsumerGroupHeartbeatResponseData.Assignment() +.setTopicPartitions(Arrays.asList( +new TopicPartitions().setTopicId(topic1).setPartitions(Arrays.asList(0, 1)), +new TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0)) +)); + when(metadata.topicNames()).thenReturn(Collections.singletonMap(topic1, "topic1")); + +// Receive assignment - full reconciliation triggered +// stay in RECONCILING state, since an unresolved topic is assigned + membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1).data()); +assertEquals(MemberState.RECONCILING, membershipManager.state()); +membershipManager.poll(time.milliseconds()); +verifyReconciliationTriggeredAndCompleted(membershipManager, +Collections.singletonList(new TopicIdPartition(topic1, new TopicPartition("topic1", 0))) +); +membershipManager.onHeartbeatRequestSent(); +assertEquals(MemberState.RECONCILING, membershipManager.state()); +clearInvocations(membershipManager); + +// Receive extended assignment - assignment received but no reconciliation triggered + membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2).data()); +assertEquals(MemberState.RECONCILING, membershipManager.state()); +verifyReconciliationNotTriggered(membershipManager); Review Comment: Just to double check, it is intentional here that you're not calling poll right? (that's why a reconciliation is not triggered here, otherwise I would expect we do trigger a reconciliation for t1-1)...I guess this is what you refer to as "intermediate" assignment (an assignment received and removed before a call to poll) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1526581796 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -889,42 +894,39 @@ private void transitionToStale() { */ void maybeReconcile() { if (targetAssignmentReconciled()) { -log.debug("Ignoring reconciliation attempt. Target assignment is equal to the " + +log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " + "current assignment."); return; } if (reconciliationInProgress) { -log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + +log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + currentTargetAssignment + " will be handled in the next reconciliation loop."); return; } // Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update // if some topic IDs are not resolvable. SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); +final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); + +if (!currentAssignment.isNone() && + resolvedAssignment.partitions.equals(currentAssignment.partitions)) { Review Comment: Done ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -889,42 +894,39 @@ private void transitionToStale() { */ void maybeReconcile() { if (targetAssignmentReconciled()) { -log.debug("Ignoring reconciliation attempt. Target assignment is equal to the " + +log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " + "current assignment."); return; } if (reconciliationInProgress) { -log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + +log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + currentTargetAssignment + " will be handled in the next reconciliation loop."); return; } // Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update // if some topic IDs are not resolvable. SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); +final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); + +if (!currentAssignment.isNone() && + resolvedAssignment.partitions.equals(currentAssignment.partitions)) { +log.debug("There are unresolved partitions, and the resolvable fragment of the target assignment {} is equal to the current " Review Comment: Done ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -970,7 +973,11 @@ void maybeReconcile() { log.debug("Auto-commit before reconciling new assignment completed successfully."); } -revokeAndAssign(assignedTopicIdPartitions, revokedPartitions, addedPartitions); +revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions); +}).whenComplete((__, error) -> { Review Comment: Done - but it requires a `return null` :/ ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -76,15 +83,14 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class HeartbeatRequestManagerTest { -private long retryBackoffMs = DEFAULT_RETRY_BACKOFF_MS; -private int heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS; -private int maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL_MS; -private long retryBackoffMaxMs = DEFAULT_RETRY_BACKOFF_MAX_MS; +private final long retryBackoffMs = DEFAULT_RETRY_BACKOFF_MS; +private final int heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS; +private final int maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL_MS; +private final long retryBackoffMaxMs = DEFAULT_RETRY_BACKOFF_MAX_MS; Review Comment: Done ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on PR #15511: URL: https://github.com/apache/kafka/pull/15511#issuecomment-2000134008 Thanks for the updates @lucasbru, took another look, LGTM considering the latests nits. Just for the record, I will follow-up with [KAFKA-16375](https://issues.apache.org/jira/browse/) for how ongoing reconciliations are discarded after rejoining. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1526609264 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -29,6 +29,8 @@ import org.apache.kafka.common.TopicPartition; Review Comment: Agree it's not covered. (Happy to take that separately myself, right after this if it helps @lucasbru...knowing you're getting busy very soon) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1526609264 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -29,6 +29,8 @@ import org.apache.kafka.common.TopicPartition; Review Comment: Agree it's not covered. (Happy to take that separately right after this if it helps @lucasbru...knowing you're getting busy very soon) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1526609264 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -29,6 +29,8 @@ import org.apache.kafka.common.TopicPartition; Review Comment: Agree it's not covered. (Happy to take that separately right after this if you prefer @lucasbru...knowing you're getting busy very soon) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1526605829 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -976,6 +974,10 @@ void maybeReconcile() { } revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions); +}).whenComplete((__, error) -> { +if (error != null) { +log.error("Reconciliation failed.", error); Review Comment: Oh I see now, thanks for the explanation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1526605292 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -911,9 +911,13 @@ void maybeReconcile() { SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); final LocalAssignmentImpl resolvedAssignment = new LocalAssignmentImpl(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -if (resolvedAssignment.equals(currentAssignment)) { -log.debug("Ignoring reconciliation attempt. Target assignment ready to reconcile {} " + -"is equal to the member current assignment.", resolvedAssignment); +if (currentAssignment != LocalAssignmentImpl.NONE && +resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 && + resolvedAssignment.partitions.equals(currentAssignment.partitions)) { +log.debug("Ignoring reconciliation attempt. The resolvable fragment of the target assignment {} " + Review Comment: Agree in the need to update the assignment. I was exactly pushing for doing only only what's needed (vs doing all that the reconciliation does), so this sounds good to me. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
dajac commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1526420387 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -970,7 +973,11 @@ void maybeReconcile() { log.debug("Auto-commit before reconciling new assignment completed successfully."); } -revokeAndAssign(assignedTopicIdPartitions, revokedPartitions, addedPartitions); +revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions); +}).whenComplete((__, error) -> { Review Comment: nit: Could we use `exceptionally`? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -552,26 +553,23 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // ServerAssignor - only sent if has changed since the last heartbeat this.membershipManager.serverAssignor().ifPresent(serverAssignor -> { -if (!serverAssignor.equals(sentFields.serverAssignor)) { +if (membershipManager.memberEpoch() == 0 || !serverAssignor.equals(sentFields.serverAssignor)) { data.setServerAssignor(serverAssignor); sentFields.serverAssignor = serverAssignor; } }); // ClientAssignors - not supported yet -// TopicPartitions - only sent if it has changed since the last heartbeat. Note that -// the string consists of just the topic ID and the partitions. When an assignment is -// received, we might not yet know the topic name, and then it is learnt subsequently -// by a metadata update. -TreeSet assignedPartitions = membershipManager.currentAssignment().entrySet().stream() -.map(entry -> entry.getKey() + "-" + entry.getValue()) -.collect(Collectors.toCollection(TreeSet::new)); -if (!assignedPartitions.equals(sentFields.topicPartitions)) { +// TopicPartitions - sent with the first heartbeat after a new assignment from the server was +// reconciled. This is ensured by resending the topic partitions whenever the local assignment, +// including its local epoch is changed (although the local epoch is not sent in the heartbeat). +LocalAssignment local = membershipManager.currentAssignment(); +if (!local.equals(sentFields.localAssignment)) { Review Comment: Do we need `membershipManager.memberEpoch() == 0` here too? I suppose that it works because the current assignment is reset in the membership manager but it may be better to add it here for consistency. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -29,6 +29,8 @@ import org.apache.kafka.common.TopicPartition; Review Comment: I wonder if we could add the example that we discussed offline as a test: ``` 0: [T1, T2] -- T2 unresolved (only T1 is reconciled) 1: [T1, T2, T3] -- T2 unresolved (skipped, since reconciliation in progress) 2: [T1, T2] -- T2 unresolved ``` What do you think? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -185,4 +190,89 @@ public interface MembershipManager extends RequestManager { * releasing its assignment. This is expected to be used when the poll timer is reset. */ void maybeRejoinStaleMember(); + +/** + * A data structure to represent the current assignment, and current target assignment of a member in a consumer group. + * + * Besides the assigned partitions, it contains a local epoch that is bumped whenever the assignment changes, to ensure + * that two assignments with the same partitions but different local epochs are not considered equal. + */ +final class LocalAssignment { + +public static final long NONE_EPOCH = -1; + +public static final LocalAssignment NONE = new LocalAssignment(NONE_EPOCH, Collections.emptyMap()); + +public final long localEpoch; + +public final Map> partitions; + +public LocalAssignment(long localEpoch, Map> partitions) { +this.localEpoch = localEpoch; +this.partitions = partitions; +if (localEpoch == NONE_EPOCH && !partitions.isEmpty()) { +throw new IllegalArgumentException("Local epoch must be set if there are partitions"); +} +} + +public LocalAssignment(long localEpoch, SortedSet topicIdPartitions) { +this.localEpoch = localEpoch; +this.partitions = new HashMap<>(); +if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) { +throw new IllegalArgumentException("Local epoch must be
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1526349331 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member -membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { -data.setInstanceId(groupInstanceId); -sentFields.instanceId = groupInstanceId; -} -}); +// InstanceId - always send when leaving the group as a static member +membershipManager.groupInstanceId().ifPresent(data::setInstanceId); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining +if (membershipManager.memberEpoch() == 0) { Review Comment: Note that this was indepndently also implemented in https://github.com/apache/kafka/pull/15401 which I had seen but forgotten about. So I will rebase and epoch == 0 conditions will come from the other PR (in the form of an equivalent state == JOINING condition). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1526346873 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -911,9 +911,13 @@ void maybeReconcile() { SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); final LocalAssignmentImpl resolvedAssignment = new LocalAssignmentImpl(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -if (resolvedAssignment.equals(currentAssignment)) { -log.debug("Ignoring reconciliation attempt. Target assignment ready to reconcile {} " + -"is equal to the member current assignment.", resolvedAssignment); +if (currentAssignment != LocalAssignmentImpl.NONE && +resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 && + resolvedAssignment.partitions.equals(currentAssignment.partitions)) { +log.debug("Ignoring reconciliation attempt. The resolvable fragment of the target assignment {} " + Review Comment: @dajac and I discussed various options to implement the short-cutting. In the end, the outcome was to only skip calling the callbacks but not skipping the entire reconciliation if the resolved assignment is equal to the current assignment but with a different epoch. We still need to update the current assignment with the resolved assignment and the new epoch in order to trigger the "ack", and transition to sending ack as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1525940513 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -911,9 +911,13 @@ void maybeReconcile() { SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); final LocalAssignmentImpl resolvedAssignment = new LocalAssignmentImpl(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -if (resolvedAssignment.equals(currentAssignment)) { -log.debug("Ignoring reconciliation attempt. Target assignment ready to reconcile {} " + -"is equal to the member current assignment.", resolvedAssignment); +if (currentAssignment != LocalAssignmentImpl.NONE && +resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 && + resolvedAssignment.partitions.equals(currentAssignment.partitions)) { +log.debug("Ignoring reconciliation attempt. The resolvable fragment of the target assignment {} " + Review Comment: I didn't want to talk about the local epoch here, since it's more of implementation detail how to detect intermediate assignments. But then I should log the local epoch I supposed. Updated it accordingly. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -1028,9 +1028,9 @@ public void testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() { verifyReconciliationNotTriggered(membershipManager); membershipManager.poll(time.milliseconds()); +membershipManager.onHeartbeatRequestSent(); Review Comment: Done ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2279,22 +2277,23 @@ private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean expectSubscri return mockJoinAndReceiveAssignment(expectSubscriptionUpdated, createAssignment(expectSubscriptionUpdated)); } -private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean expectSubscriptionUpdated, +private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean triggerReconciliation, Review Comment: Done ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -976,6 +974,10 @@ void maybeReconcile() { } revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions); +}).whenComplete((__, error) -> { +if (error != null) { +log.error("Reconciliation failed.", error); Review Comment: Nope, the exception handling inside `revokeAndAssign` is only triggered if the revoke and assign future fails. If `revokeAndAssign` fails outside the future (in particular, inside `revokePartitions`, there is logic that may fail), the exception falls through to here and was silently swallowed before, which cost me an hour of debugging to find. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1524592939 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -185,4 +185,18 @@ public interface MembershipManager extends RequestManager { * releasing its assignment. This is expected to be used when the poll timer is reset. */ void maybeRejoinStaleMember(); + +/** + * A data structure to represent the current assignment, and current target assignment of a member in a consumer group. + * + * Besides the assigned partitions, it contains a local epoch that is bumped whenever the assignment changes, to ensure + * that two assignments with the same partitions but different local epochs are not considered equal. Review Comment: Done (removed the interface) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1525912448 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member -membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { -data.setInstanceId(groupInstanceId); -sentFields.instanceId = groupInstanceId; -} -}); +// InstanceId - always send when leaving the group as a static member +membershipManager.groupInstanceId().ifPresent(data::setInstanceId); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining +if (membershipManager.memberEpoch() == 0) { Review Comment: Summary of slack discussion: We need to resend a full request after a failure, to let the GC know that it needs to send a full response (including repeating the assignment). The reason is that if you had a request timeout for a heartbeat request, that heartbeat could have been the one with a new assignment for you. If you don't send a full request next, the server will assume that you've receive that last one so it won't deliver the new assignment again. So sending a full request is an implicit "force full response". I reverted that part of the changes (so I brought back the rebalanceMs and serverAssignor in sentFields). We could consider only using reset on the heartbeat state (resetting when transitioning to JOINING may be enough). However, that seems more brittle, it’s quite easy to introduce a code path where we forget to reset the heartbeat state. Having a contract like “always send when epoch==0” for the joining case is easier in my eyes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on PR #15511: URL: https://github.com/apache/kafka/pull/15511#issuecomment-1998471082 Thanks for the updates @lucasbru, left some minor comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1525452774 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -976,6 +974,10 @@ void maybeReconcile() { } revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions); +}).whenComplete((__, error) -> { +if (error != null) { +log.error("Reconciliation failed.", error); Review Comment: Is there a reason why we want this log here? We already have the same but inside the `revokeAndAssign`, when reconciliation completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1525446624 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -1028,9 +1028,9 @@ public void testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() { verifyReconciliationNotTriggered(membershipManager); membershipManager.poll(time.milliseconds()); +membershipManager.onHeartbeatRequestSent(); Review Comment: I would suggest we add the check that a reconciliation was triggered here, just adding `verifyReconciliationTriggeredAndCompleted(membershipManager, Collections.emptyList());` right after poll. It's part of what this PR is introducing and it completes the pic of what's happening when getting the first (empty) assignment that can be reconciled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1525418611 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2279,22 +2277,23 @@ private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean expectSubscri return mockJoinAndReceiveAssignment(expectSubscriptionUpdated, createAssignment(expectSubscriptionUpdated)); } -private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean expectSubscriptionUpdated, +private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean triggerReconciliation, Review Comment: Agree with the renamed param, but could we update it also in the same overloaded method above, just for consistency -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1525386165 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -911,9 +911,13 @@ void maybeReconcile() { SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); final LocalAssignmentImpl resolvedAssignment = new LocalAssignmentImpl(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -if (resolvedAssignment.equals(currentAssignment)) { -log.debug("Ignoring reconciliation attempt. Target assignment ready to reconcile {} " + -"is equal to the member current assignment.", resolvedAssignment); +if (currentAssignment != LocalAssignmentImpl.NONE && +resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 && + resolvedAssignment.partitions.equals(currentAssignment.partitions)) { +log.debug("Ignoring reconciliation attempt. The resolvable fragment of the target assignment {} " + Review Comment: just to consider maybe simplifying the log and clarify the situation: isn't the message here simply that we're ignoring the reconciliation because resolved target is equals to the current assignment? I get the point about intermediate assignments, but an intermediate one would have updated the current assignment so it wouldn't be equals to the resolved target (or leave a reconciliation in progress so it wouldn't even make it to this check) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1525360383 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -988,7 +989,8 @@ long getExpirationTimeForTimeout(final long timeoutMs) { * then complete the reconciliation by updating the assignment and making the appropriate state * transition. Note that if any of the 2 callbacks fails, the reconciliation should fail. */ -private void revokeAndAssign(SortedSet assignedTopicIdPartitions, +private void revokeAndAssign(LocalAssignmentImpl resolvedAssignment, Review Comment: It was this same one [KAFKA-16185](https://issues.apache.org/jira/browse/KAFKA-16185) thought for handling the situation around discarding reconciliations (which is what that rejoin check is for). But I had missed it in the review too. I just created [KAFKA-16375](https://issues.apache.org/jira/browse/KAFKA-16375) to handle it in a follow-up PR (probably moving away of all epochs to cover that edge case, and identifying the rejoin simply by the member going through a transition to join). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1524575611 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1497,4 +1502,93 @@ public PollResult poll(final long currentTimeMs) { List stateListeners() { return unmodifiableList(stateUpdatesListeners); } + +private final static class LocalAssignmentImpl implements LocalAssignment { + +private static final long NONE_EPOCH = -1; + +private static final LocalAssignmentImpl NONE = new LocalAssignmentImpl(NONE_EPOCH, Collections.emptyMap()); + +private final long localEpoch; + +private final Map> partitions; + +public LocalAssignmentImpl(long localEpoch, Map> partitions) { +this.localEpoch = localEpoch; +this.partitions = partitions; +if (localEpoch == NONE_EPOCH && !partitions.isEmpty()) { +throw new IllegalArgumentException("Local epoch must be set if there are partitions"); +} +} + +public LocalAssignmentImpl(long localEpoch, SortedSet topicIdPartitions) { +this.localEpoch = localEpoch; +this.partitions = new HashMap<>(); +if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) { +throw new IllegalArgumentException("Local epoch must be set if there are partitions"); +} +topicIdPartitions.forEach(topicIdPartition -> { +Uuid topicId = topicIdPartition.topicId(); +partitions.computeIfAbsent(topicId, k -> new TreeSet<>()).add(topicIdPartition.partition()); +}); +} + +@Override +public String toString() { +return "{" + +"localEpoch=" + localEpoch + +", partitions=" + partitions + +'}'; +} + +@Override +public boolean equals(final Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +final LocalAssignmentImpl that = (LocalAssignmentImpl) o; +return localEpoch == that.localEpoch && Objects.equals(partitions, that.partitions); +} + +@Override +public int hashCode() { +return Objects.hash(localEpoch, partitions); +} + +@Override +public Map> getPartitions() { +return partitions; +} + +@Override +public boolean isNone() { +return localEpoch == NONE_EPOCH; +} + +Optional updateWith(ConsumerGroupHeartbeatResponseData.Assignment assignment) { + +// Return if we have an assignment, and it is the same as current assignment; comparison without creating a new collection +if (localEpoch != NONE_EPOCH) { +// check if the new assignment is different from the current target assignment Review Comment: Done ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member -membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { -data.setInstanceId(groupInstanceId); -sentFields.instanceId = groupInstanceId; -} -}); +// InstanceId - always send when leaving the group as a static member +membershipManager.groupInstanceId().ifPresent(data::setInstanceId); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining +if (membershipManager.memberEpoch() == 0) { Review Comment: Done ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -1952,19 +1942,22 @@ private void assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm // Should reset epoch to leave the group and release the assignment (right away because // there is no onPartitionsLost callback defined) verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); -assertTrue(membershipManager.currentAssignment().isEmpty()); +
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
dajac commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1524608939 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member -membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { -data.setInstanceId(groupInstanceId); -sentFields.instanceId = groupInstanceId; -} -}); +// InstanceId - always send when leaving the group as a static member +membershipManager.groupInstanceId().ifPresent(data::setInstanceId); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining +if (membershipManager.memberEpoch() == 0) { Review Comment: I had a second look at all this logic and I actually wonder if it is better to keep the previous logic for rebalanceTimeoutMs and serverAssignor. My concern is that we should actually send out a full request when `reset` has been called and with the change it is not the case anymore. The main was the send a full request when joining or on any errors (e.g. request timeout). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1524569366 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -530,19 +530,12 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member -membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { -data.setInstanceId(groupInstanceId); -sentFields.instanceId = groupInstanceId; -} -}); +// InstanceId - always send when leaving the group as a static member +membershipManager.groupInstanceId().ifPresent(data::setInstanceId); Review Comment: Ah, I didn't see that one, so the change will go away with a merge -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1524568814 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -889,43 +914,36 @@ private void transitionToStale() { */ void maybeReconcile() { if (targetAssignmentReconciled()) { -log.debug("Ignoring reconciliation attempt. Target assignment is equal to the " + +log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " + "current assignment."); return; } if (reconciliationInProgress) { -log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + +log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + currentTargetAssignment + " will be handled in the next reconciliation loop."); return; } // Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update // if some topic IDs are not resolvable. SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); +final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -SortedSet ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); -ownedPartitions.addAll(subscriptions.assignedPartitions()); - -// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are -// being reconciled. Needed for interactions with the centralized subscription state that -// does not support topic IDs yet, and for the callbacks. -SortedSet assignedTopicPartitions = toTopicPartitionSet(assignedTopicIdPartitions); - -// Check same assignment. Based on topic names for now, until topic IDs are properly -// supported in the centralized subscription state object. Note that this check is -// required to make sure that reconciliation is not triggered if the assignment ready to -// be reconciled is the same as the current one (even though the member may remain -// in RECONCILING state if it has some unresolved assignments). -boolean sameAssignmentReceived = assignedTopicPartitions.equals(ownedPartitions); - -if (sameAssignmentReceived) { +if (resolvedAssignment.equals(currentAssignment)) { Review Comment: Okay. I will short-cut the reconciliation if the resolvable assignment did not change and if the local target epoch either did not change or was bumped once (the latter being your example). In the latter case, I'll still bump the local current epoch. It's a bit more special casing than I was hoping for, but I see that if these "delayed topic names" are common, I'd be annoying to get two reconciliations every time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
dajac commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1524430731 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member -membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { -data.setInstanceId(groupInstanceId); -sentFields.instanceId = groupInstanceId; -} -}); +// InstanceId - always send when leaving the group as a static member +membershipManager.groupInstanceId().ifPresent(data::setInstanceId); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining +if (membershipManager.memberEpoch() == 0) { Review Comment: While we are here, we may be able to do the same for `serverAssignor` field as it never changes during the runtime. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -185,4 +185,18 @@ public interface MembershipManager extends RequestManager { * releasing its assignment. This is expected to be used when the poll timer is reset. */ void maybeRejoinStaleMember(); + +/** + * A data structure to represent the current assignment, and current target assignment of a member in a consumer group. + * + * Besides the assigned partitions, it contains a local epoch that is bumped whenever the assignment changes, to ensure + * that two assignments with the same partitions but different local epochs are not considered equal. + */ +interface LocalAssignment { + +Map> getPartitions(); Review Comment: nit: We usually don't prefix getters with `get`. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -566,18 +560,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // ClientAssignors - not supported yet -// TopicPartitions - only sent if it has changed since the last heartbeat. Note that -// the string consists of just the topic ID and the partitions. When an assignment is -// received, we might not yet know the topic name, and then it is learnt subsequently -// by a metadata update. -TreeSet assignedPartitions = membershipManager.currentAssignment().entrySet().stream() -.map(entry -> entry.getKey() + "-" + entry.getValue()) -.collect(Collectors.toCollection(TreeSet::new)); -if (!assignedPartitions.equals(sentFields.topicPartitions)) { +// TopicPartitions - sent with the first heartbeat after a new assignment from the server was +// reconciled. This is ensured by resending the topic partitions whenever the local assignment, +// including its local epoch is changed (although the local epoch is not sent in the heartbeat). +LocalAssignment local = membershipManager.currentAssignment(); +if (local == null) { Review Comment: nit: We could use `isNone`. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -988,7 +989,8 @@ long getExpirationTimeForTimeout(final long timeoutMs) { * then complete the reconciliation by updating the assignment and making the appropriate state * transition. Note that if any of the 2 callbacks fails, the reconciliation should fail. */ -private void revokeAndAssign(SortedSet assignedTopicIdPartitions, +private void revokeAndAssign(LocalAssignmentImpl resolvedAssignment, Review Comment: btw, I just noticed the `boolean memberHasRejoined = memberEpochOnReconciliationStart != memberEpoch` condition while reading the code again. We have it in two places. I think that this is wrong because the member epoch could effectively change without leaving. @lianetm I recall that we discussed this a while ago. Do we have a jira to address this? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -1952,19 +1942,22 @@ private void assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm // Should
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on PR #15511: URL: https://github.com/apache/kafka/pull/15511#issuecomment-1995835817 hey @lucasbru, I took another full pass, left a few comments. Thanks for the changes! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523858796 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -889,43 +914,36 @@ private void transitionToStale() { */ void maybeReconcile() { if (targetAssignmentReconciled()) { -log.debug("Ignoring reconciliation attempt. Target assignment is equal to the " + +log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " + "current assignment."); return; } if (reconciliationInProgress) { -log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + +log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + currentTargetAssignment + " will be handled in the next reconciliation loop."); return; } // Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update // if some topic IDs are not resolvable. SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); +final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -SortedSet ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); -ownedPartitions.addAll(subscriptions.assignedPartitions()); - -// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are -// being reconciled. Needed for interactions with the centralized subscription state that -// does not support topic IDs yet, and for the callbacks. -SortedSet assignedTopicPartitions = toTopicPartitionSet(assignedTopicIdPartitions); - -// Check same assignment. Based on topic names for now, until topic IDs are properly -// supported in the centralized subscription state object. Note that this check is -// required to make sure that reconciliation is not triggered if the assignment ready to -// be reconciled is the same as the current one (even though the member may remain -// in RECONCILING state if it has some unresolved assignments). -boolean sameAssignmentReceived = assignedTopicPartitions.equals(ownedPartitions); - -if (sameAssignmentReceived) { +if (resolvedAssignment.equals(currentAssignment)) { Review Comment: well I was expecting we only need to trigger the callbacks if the assignment changed (could be to empty, but something needs to change), and that's not the case if the member ends up with t1-1 again, that it already owns. By running a full reconciliation when the the resolved assignment is the same as the current but received later, we end up with a client reconciling the exact same assignment it already owns :S It would turn out noisy I expect, accounting for 2 rebalances in cases probably much more common, where a new topic assigned is temporarily not in metadata and then discovered: member owns [t1-1], receives assignment [t1-1, t2-1] with missing metadata for t2 (t2 discovered shortly after). Wouldn't we generate 2 rebalances??? (a 1st one with no changes in assignment, a 2nd one with the added topic once discovered) when truly things only changed once) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523903844 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -1952,19 +1942,22 @@ private void assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm // Should reset epoch to leave the group and release the assignment (right away because // there is no onPartitionsLost callback defined) verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); -assertTrue(membershipManager.currentAssignment().isEmpty()); +assertTrue(membershipManager.currentAssignment().isNone()); assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty()); assertEquals(LEAVE_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch()); } @Test public void testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() { Review Comment: I cannot find any test for one of the 2 issues we're after with this PR: ensure that we are triggering the callbacks when joining and getting empty assignment. Could we add it? I expect it should be very similar to this one, but just providing a `CounterConsumerRebalanceListener` and asserting that it called the `onPartitionsAssigned` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523885245 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member -membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { -data.setInstanceId(groupInstanceId); -sentFields.instanceId = groupInstanceId; -} -}); +// InstanceId - always send when leaving the group as a static member +membershipManager.groupInstanceId().ifPresent(data::setInstanceId); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining +if (membershipManager.memberEpoch() == 0) { data.setRebalanceTimeoutMs(rebalanceTimeoutMs); -sentFields.rebalanceTimeoutMs = rebalanceTimeoutMs; } if (!this.subscriptions.hasPatternSubscription()) { -// SubscribedTopicNames - only sent if has changed since the last heartbeat +// SubscribedTopicNames - only sent when joining or if has changed since the last heartbeat Review Comment: inherited, but now that we're here: "or if **it** has changed..." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523863939 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -683,7 +681,10 @@ public void testDelayedMetadataUsedToCompleteAssignment() { receiveAssignment(newAssignment, membershipManager); membershipManager.poll(time.milliseconds()); -verifyReconciliationNotTriggered(membershipManager); +// We bumped the local epoch, so new reconciliation is triggered Review Comment: This new reconciliation triggered is an example of what I was referring to in the [comment](https://github.com/apache/kafka/pull/15511/files#r1523858796) above, that seems to complicate the flow with 2 rebalances instead of 1. I wouldn't expect a member rebalancing/reconciling at this point (no assignment change for him yet, t2 not in metadata), and then reconciling/rebalance again once it discovers it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523880045 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -508,9 +508,30 @@ private void processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign */ private void replaceTargetAssignmentWithNewAssignment( ConsumerGroupHeartbeatResponseData.Assignment assignment) { -currentTargetAssignment.clear(); + +// Return if the same as current assignment; comparison without creating a new collection +if (currentTargetAssignment != null) { +// check if the new assignment is different from the current target assignment +if (currentTargetAssignment.partitions.size() == assignment.topicPartitions().size() && +assignment.topicPartitions().stream().allMatch( +tp -> currentTargetAssignment.partitions.containsKey(tp.topicId()) && + currentTargetAssignment.partitions.get(tp.topicId()).size() == tp.partitions().size() && + currentTargetAssignment.partitions.get(tp.topicId()).containsAll(tp.partitions( { +return; +} +} + +// Bump local epoch and replace assignment +long nextLocalEpoch; +if (currentTargetAssignment == null) { +nextLocalEpoch = 0; +} else { +nextLocalEpoch = currentTargetAssignment.localEpoch + 1; +} Review Comment: I was wrong thinking that we could reuse the server epoch here, but it's not truly representing new assignments for the member, and we could end up receiving multiple new assignments in the same server epoch. So ok with this local epoch approach, best way to keep track of "versions" of the assignment received. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523863939 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -683,7 +681,10 @@ public void testDelayedMetadataUsedToCompleteAssignment() { receiveAssignment(newAssignment, membershipManager); membershipManager.poll(time.milliseconds()); -verifyReconciliationNotTriggered(membershipManager); +// We bumped the local epoch, so new reconciliation is triggered Review Comment: This new reconciliation triggered is an example of what I was referring to in the [comment](https://github.com/apache/kafka/pull/15511/files#r1523858796) above, that seems to complicate the flow with 2 rebalances instead of 1. I wouldn't expect a member rebalancing/reconciling at this point (nothing changed for him yet, t2 not in metadata), and then reconciling/rebalance again once it discovers it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523858796 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -889,43 +914,36 @@ private void transitionToStale() { */ void maybeReconcile() { if (targetAssignmentReconciled()) { -log.debug("Ignoring reconciliation attempt. Target assignment is equal to the " + +log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " + "current assignment."); return; } if (reconciliationInProgress) { -log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + +log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + currentTargetAssignment + " will be handled in the next reconciliation loop."); return; } // Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update // if some topic IDs are not resolvable. SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); +final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -SortedSet ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); -ownedPartitions.addAll(subscriptions.assignedPartitions()); - -// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are -// being reconciled. Needed for interactions with the centralized subscription state that -// does not support topic IDs yet, and for the callbacks. -SortedSet assignedTopicPartitions = toTopicPartitionSet(assignedTopicIdPartitions); - -// Check same assignment. Based on topic names for now, until topic IDs are properly -// supported in the centralized subscription state object. Note that this check is -// required to make sure that reconciliation is not triggered if the assignment ready to -// be reconciled is the same as the current one (even though the member may remain -// in RECONCILING state if it has some unresolved assignments). -boolean sameAssignmentReceived = assignedTopicPartitions.equals(ownedPartitions); - -if (sameAssignmentReceived) { +if (resolvedAssignment.equals(currentAssignment)) { Review Comment: well I was expecting we only need to trigger the callbacks if the assignment changed (could be to empty, but something needs to change), and that's not the case if the member ends up with t1-1 again, that it already owns. By running a full reconciliation when the the resolved assignment is the same as the current but received later, we end up with a client reconciling the exact same assignment it already owns :S It would turn out noisy I expect, accounting for 2 rebalances in cases probably much more common, where a new topic assigned is temporarily not in metadata and then discovered: member owns [t1-1], receives assignment [t1-1, t2-1] with missing metadata for t2 (t2 discovered shortly after). We would generate 2 rebalances (a 1st one with no changes in assignment, a 2nd one with the added topic once discovered) when truly things only changed once). Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523734711 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1497,4 +1502,93 @@ public PollResult poll(final long currentTimeMs) { List stateListeners() { return unmodifiableList(stateUpdatesListeners); } + +private final static class LocalAssignmentImpl implements LocalAssignment { + +private static final long NONE_EPOCH = -1; + +private static final LocalAssignmentImpl NONE = new LocalAssignmentImpl(NONE_EPOCH, Collections.emptyMap()); + +private final long localEpoch; + +private final Map> partitions; + +public LocalAssignmentImpl(long localEpoch, Map> partitions) { +this.localEpoch = localEpoch; +this.partitions = partitions; +if (localEpoch == NONE_EPOCH && !partitions.isEmpty()) { +throw new IllegalArgumentException("Local epoch must be set if there are partitions"); +} +} + +public LocalAssignmentImpl(long localEpoch, SortedSet topicIdPartitions) { +this.localEpoch = localEpoch; +this.partitions = new HashMap<>(); +if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) { +throw new IllegalArgumentException("Local epoch must be set if there are partitions"); +} +topicIdPartitions.forEach(topicIdPartition -> { +Uuid topicId = topicIdPartition.topicId(); +partitions.computeIfAbsent(topicId, k -> new TreeSet<>()).add(topicIdPartition.partition()); +}); +} + +@Override +public String toString() { +return "{" + +"localEpoch=" + localEpoch + +", partitions=" + partitions + +'}'; +} + +@Override +public boolean equals(final Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +final LocalAssignmentImpl that = (LocalAssignmentImpl) o; +return localEpoch == that.localEpoch && Objects.equals(partitions, that.partitions); +} + +@Override +public int hashCode() { +return Objects.hash(localEpoch, partitions); +} + +@Override +public Map> getPartitions() { +return partitions; +} + +@Override +public boolean isNone() { +return localEpoch == NONE_EPOCH; +} + +Optional updateWith(ConsumerGroupHeartbeatResponseData.Assignment assignment) { + +// Return if we have an assignment, and it is the same as current assignment; comparison without creating a new collection +if (localEpoch != NONE_EPOCH) { +// check if the new assignment is different from the current target assignment Review Comment: nit: here we are checking if the new assignment is "the same as" the current (not diff)even maybe just remove this comment, as the one above seems enough? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523709384 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -530,19 +530,12 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member -membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { -data.setInstanceId(groupInstanceId); -sentFields.instanceId = groupInstanceId; -} -}); +// InstanceId - always send when leaving the group as a static member +membershipManager.groupInstanceId().ifPresent(data::setInstanceId); Review Comment: yes, that's the current expectation, so looks good. Let's just update the comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523523053 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // ClientAssignors - not supported yet -// TopicPartitions - only sent if it has changed since the last heartbeat. Note that -// the string consists of just the topic ID and the partitions. When an assignment is -// received, we might not yet know the topic name, and then it is learnt subsequently -// by a metadata update. -TreeSet assignedPartitions = membershipManager.currentAssignment().entrySet().stream() -.map(entry -> entry.getKey() + "-" + entry.getValue()) -.collect(Collectors.toCollection(TreeSet::new)); -if (!assignedPartitions.equals(sentFields.topicPartitions)) { +// TopicPartitions - sent with the first heartbeat after a new assignment from the server was +// reconciled. This is ensured by resending the topic partitions whenever the local assignment, +// including its local epoch is changed (although the local epoch is not sent in the heartbeat). +LocalAssignment local = membershipManager.currentAssignment(); +if (local == null) { +data.setTopicPartitions(Collections.emptyList()); +sentFields.topicPartitions = null; +} else if (!local.equals(sentFields.topicPartitions)) { Review Comment: Renamed the field -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523522542 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } }); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining or if has changed since the last heartbeat +if (membershipManager.memberEpoch() == 0 || sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { Review Comment: Done ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -508,9 +508,30 @@ private void processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign */ private void replaceTargetAssignmentWithNewAssignment( ConsumerGroupHeartbeatResponseData.Assignment assignment) { -currentTargetAssignment.clear(); + Review Comment: Implemented both changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523522030 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -210,12 +213,12 @@ public class MembershipManagerImpl implements MembershipManager { private final Map assignedTopicNamesCache; /** - * Topic IDs and partitions received in the last target assignment. Items are added to this set - * every time a target assignment is received. This is where the member collects the assignment - * received from the broker, even though it may not be ready to fully reconcile due to missing - * metadata. + * Topic IDs and partitions received in the last target assignment, together with its local epoch. + * + * This member reassigned every time a new assignment is received. Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523520659 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -530,19 +530,12 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member -membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { -data.setInstanceId(groupInstanceId); -sentFields.instanceId = groupInstanceId; -} -}); +// InstanceId - always send when leaving the group as a static member +membershipManager.groupInstanceId().ifPresent(data::setInstanceId); Review Comment: I updated instance ID to be send it all the time when present. I think the last information was that GC relies on this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1522964906 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -508,9 +508,30 @@ private void processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign */ private void replaceTargetAssignmentWithNewAssignment( ConsumerGroupHeartbeatResponseData.Assignment assignment) { -currentTargetAssignment.clear(); + +// Return if the same as current assignment; comparison without creating a new collection +if (currentTargetAssignment != null) { +// check if the new assignment is different from the current target assignment +if (currentTargetAssignment.partitions.size() == assignment.topicPartitions().size() && +assignment.topicPartitions().stream().allMatch( +tp -> currentTargetAssignment.partitions.containsKey(tp.topicId()) && + currentTargetAssignment.partitions.get(tp.topicId()).size() == tp.partitions().size() && + currentTargetAssignment.partitions.get(tp.topicId()).containsAll(tp.partitions( { +return; +} +} + +// Bump local epoch and replace assignment +long nextLocalEpoch; +if (currentTargetAssignment == null) { +nextLocalEpoch = 0; +} else { +nextLocalEpoch = currentTargetAssignment.localEpoch + 1; +} Review Comment: I think the server is bumping the target member epoch when any assignment for any member changes, while the local epoch is only bumped if the assignment for this specific member changes. So we should get fewer reconciliations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1522901846 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -889,43 +914,36 @@ private void transitionToStale() { */ void maybeReconcile() { if (targetAssignmentReconciled()) { -log.debug("Ignoring reconciliation attempt. Target assignment is equal to the " + +log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " + "current assignment."); return; } if (reconciliationInProgress) { -log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + +log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + currentTargetAssignment + " will be handled in the next reconciliation loop."); return; } // Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update // if some topic IDs are not resolvable. SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); +final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -SortedSet ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); -ownedPartitions.addAll(subscriptions.assignedPartitions()); - -// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are -// being reconciled. Needed for interactions with the centralized subscription state that -// does not support topic IDs yet, and for the callbacks. -SortedSet assignedTopicPartitions = toTopicPartitionSet(assignedTopicIdPartitions); - -// Check same assignment. Based on topic names for now, until topic IDs are properly -// supported in the centralized subscription state object. Note that this check is -// required to make sure that reconciliation is not triggered if the assignment ready to -// be reconciled is the same as the current one (even though the member may remain -// in RECONCILING state if it has some unresolved assignments). -boolean sameAssignmentReceived = assignedTopicPartitions.equals(ownedPartitions); - -if (sameAssignmentReceived) { +if (resolvedAssignment.equals(currentAssignment)) { Review Comment: I guess we could do it, but do we have to, and is it the best thing we should do? It seems like a corner case to me, and maybe the easiest and cleanest behavior is to just run a full reconciliation, because the assignment changed in the meantime, even if our client never managed to reconcile the intermediate assignment. It seems users are using the ConsumerRebalanceListener to detect rebalances (as do our system tests). So the case you are describing, sounds like a rebalance event to me, so I think we should call the listener. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1521937262 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -210,12 +213,12 @@ public class MembershipManagerImpl implements MembershipManager { private final Map assignedTopicNamesCache; /** - * Topic IDs and partitions received in the last target assignment. Items are added to this set - * every time a target assignment is received. This is where the member collects the assignment - * received from the broker, even though it may not be ready to fully reconcile due to missing - * metadata. + * Topic IDs and partitions received in the last target assignment, together with its local epoch. + * + * This member reassigned every time a new assignment is received. Review Comment: I don't quite get this sentence (was the intention just to start with `Reassigned every`... I guess...?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1521933499 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -889,43 +914,36 @@ private void transitionToStale() { */ void maybeReconcile() { if (targetAssignmentReconciled()) { -log.debug("Ignoring reconciliation attempt. Target assignment is equal to the " + +log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " + "current assignment."); return; } if (reconciliationInProgress) { -log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + +log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + currentTargetAssignment + " will be handled in the next reconciliation loop."); return; } // Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update // if some topic IDs are not resolvable. SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); +final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -SortedSet ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); -ownedPartitions.addAll(subscriptions.assignedPartitions()); - -// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are -// being reconciled. Needed for interactions with the centralized subscription state that -// does not support topic IDs yet, and for the callbacks. -SortedSet assignedTopicPartitions = toTopicPartitionSet(assignedTopicIdPartitions); - -// Check same assignment. Based on topic names for now, until topic IDs are properly -// supported in the centralized subscription state object. Note that this check is -// required to make sure that reconciliation is not triggered if the assignment ready to -// be reconciled is the same as the current one (even though the member may remain -// in RECONCILING state if it has some unresolved assignments). -boolean sameAssignmentReceived = assignedTopicPartitions.equals(ownedPartitions); - -if (sameAssignmentReceived) { +if (resolvedAssignment.equals(currentAssignment)) { Review Comment: This is short-circuiting the reconciliation if the same assignment is received (same epoch, same partitions). But we also need to consider the case that we get the same partitions assigned but in a different epoch. In that case, we should not carry on with the full reconciliation process (there is truly nothing to reconcile), but we should send an ack to the broker, so I would expect we need a similar short-circuit for `if sameAssignmentInDiffEpoch => transitionToAck & return;`. It's mainly thinking about the case: - member owns t1-1 epoch 3 - receives new assignment [t1-1, t2-1] epoch 4, stuck reconciling, ex. missing t2 metadata - receives new assignment [t1-1] epoch 5 (ex. broker realized t2 has been deleted) - member does not need to reconcile t1-1, but should send an ack to the broker with t1-1 that it received on a newer epoch Makes sense? Not sure if I may be missing something regarding the expectations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1521933499 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -889,43 +914,36 @@ private void transitionToStale() { */ void maybeReconcile() { if (targetAssignmentReconciled()) { -log.debug("Ignoring reconciliation attempt. Target assignment is equal to the " + +log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " + "current assignment."); return; } if (reconciliationInProgress) { -log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + +log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + currentTargetAssignment + " will be handled in the next reconciliation loop."); return; } // Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update // if some topic IDs are not resolvable. SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); +final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -SortedSet ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); -ownedPartitions.addAll(subscriptions.assignedPartitions()); - -// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are -// being reconciled. Needed for interactions with the centralized subscription state that -// does not support topic IDs yet, and for the callbacks. -SortedSet assignedTopicPartitions = toTopicPartitionSet(assignedTopicIdPartitions); - -// Check same assignment. Based on topic names for now, until topic IDs are properly -// supported in the centralized subscription state object. Note that this check is -// required to make sure that reconciliation is not triggered if the assignment ready to -// be reconciled is the same as the current one (even though the member may remain -// in RECONCILING state if it has some unresolved assignments). -boolean sameAssignmentReceived = assignedTopicPartitions.equals(ownedPartitions); - -if (sameAssignmentReceived) { +if (resolvedAssignment.equals(currentAssignment)) { Review Comment: This is short-circuiting the reconciliation if the same assignment is received (same epoch, same partitions). But we also need to consider also the case that we get the same partitions assigned but in a different epoch. In that case, we should not carry on with the full reconciliation process (there is truly nothing to reconcile), but we should send an ack to the broker, so I would expect we need a similar short-circuit for `if sameAssignmentInDiffEpoch => transitionToAck & return;`. It's mainly thinking about the case: - member owns t1-1 epoch 3 - receives new assignment [t1-1, t2-1] epoch 4, stuck reconciling, ex. missing t2 metadata - receives new assignment [t1-1] epoch 5 (ex. broker realized t2 has been deleted) - member does not need to reconcile t1-1, but should send an ack to the broker with t1-1 that it received on a newer epoch Makes sense? Not sure if I may be missing something regarding the expectations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1521750747 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -508,9 +508,30 @@ private void processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign */ private void replaceTargetAssignmentWithNewAssignment( ConsumerGroupHeartbeatResponseData.Assignment assignment) { -currentTargetAssignment.clear(); + Review Comment: 1. I did not do it, because the `LocalAssignment` is leaked from this file via the `currentAssignment` method, and I didn't necessarily want to put so much logic in the public interface `MembershipManager`. However, I think I could possibly define the return value of `currentAssignment` by a light interface, and then put the fat class with all the updating logic in here. I'll give it a try. 2. I wouldn't necessarily call it `EMPTY` (to avoid confusion with an empty assignment), but rather `NONE` or something, but other than that it sounds like a good idea. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } }); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining or if has changed since the last heartbeat +if (membershipManager.memberEpoch() == 0 || sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { Review Comment: true, this can be simplified ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // ClientAssignors - not supported yet -// TopicPartitions - only sent if it has changed since the last heartbeat. Note that -// the string consists of just the topic ID and the partitions. When an assignment is -// received, we might not yet know the topic name, and then it is learnt subsequently -// by a metadata update. -TreeSet assignedPartitions = membershipManager.currentAssignment().entrySet().stream() -.map(entry -> entry.getKey() + "-" + entry.getValue()) -.collect(Collectors.toCollection(TreeSet::new)); -if (!assignedPartitions.equals(sentFields.topicPartitions)) { +// TopicPartitions - sent with the first heartbeat after a new assignment from the server was +// reconciled. This is ensured by resending the topic partitions whenever the local assignment, +// including its local epoch is changed (although the local epoch is not sent in the heartbeat). +LocalAssignment local = membershipManager.currentAssignment(); +if (local == null) { +data.setTopicPartitions(Collections.emptyList()); +sentFields.topicPartitions = null; +} else if (!local.equals(sentFields.topicPartitions)) { Review Comment: That's what is being done. Maybe I should have renamed the field `sentFields.topicPartitions` to `sentFields.localAssignment`, but the type is now `LocalAssignment` and the `equals` compares the local epoch as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1521745679 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -508,9 +508,30 @@ private void processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign */ private void replaceTargetAssignmentWithNewAssignment( ConsumerGroupHeartbeatResponseData.Assignment assignment) { -currentTargetAssignment.clear(); + +// Return if the same as current assignment; comparison without creating a new collection +if (currentTargetAssignment != null) { +// check if the new assignment is different from the current target assignment +if (currentTargetAssignment.partitions.size() == assignment.topicPartitions().size() && +assignment.topicPartitions().stream().allMatch( +tp -> currentTargetAssignment.partitions.containsKey(tp.topicId()) && + currentTargetAssignment.partitions.get(tp.topicId()).size() == tp.partitions().size() && + currentTargetAssignment.partitions.get(tp.topicId()).containsAll(tp.partitions( { +return; +} +} + +// Bump local epoch and replace assignment +long nextLocalEpoch; +if (currentTargetAssignment == null) { +nextLocalEpoch = 0; +} else { +nextLocalEpoch = currentTargetAssignment.localEpoch + 1; +} Review Comment: Is there a reason why we need to compute epochs on the client here? The server is the one bumping the epochs whenever it computes a new assignment for a member. I was expecting that we just keep a `LocalAssignment` that will contain the partitions and epoch the broker sent on the `ConsumerGroupHeartbeatResponseData` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1521632809 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } }); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining or if has changed since the last heartbeat +if (membershipManager.memberEpoch() == 0 || sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { Review Comment: it is static, taken from the `max.poll.interval.ms`, so agree we could set it only when epoch == 0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
dajac commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1521552751 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } }); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining or if has changed since the last heartbeat +if (membershipManager.memberEpoch() == 0 || sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { Review Comment: If not mistaken, the `rebalanceTimeoutMs` timeout is a static config so we could actually set it only in when joining the group (when epoch == 0). ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -508,9 +508,30 @@ private void processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign */ private void replaceTargetAssignmentWithNewAssignment( ConsumerGroupHeartbeatResponseData.Assignment assignment) { -currentTargetAssignment.clear(); + Review Comment: I have to high level thoughts but I am not sure whether they are worth it: 1) Have you considered moving all the update logic into `LocalAssignment`? We could have a method such a `updateWith(Assignment)` which returns an `Optional` containing the new assignment if it was updated. 2) On a similar line, I wonder if we could have an `EMPTY` constant for the default `LocalAssignment(-1, null)` instead of relying on `null`. The reasoning of this one is that it avoids having to deal with `null` in a few places in this file. What do you think? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // ClientAssignors - not supported yet -// TopicPartitions - only sent if it has changed since the last heartbeat. Note that -// the string consists of just the topic ID and the partitions. When an assignment is -// received, we might not yet know the topic name, and then it is learnt subsequently -// by a metadata update. -TreeSet assignedPartitions = membershipManager.currentAssignment().entrySet().stream() -.map(entry -> entry.getKey() + "-" + entry.getValue()) -.collect(Collectors.toCollection(TreeSet::new)); -if (!assignedPartitions.equals(sentFields.topicPartitions)) { +// TopicPartitions - sent with the first heartbeat after a new assignment from the server was +// reconciled. This is ensured by resending the topic partitions whenever the local assignment, +// including its local epoch is changed (although the local epoch is not sent in the heartbeat). +LocalAssignment local = membershipManager.currentAssignment(); +if (local == null) { +data.setTopicPartitions(Collections.emptyList()); +sentFields.topicPartitions = null; +} else if (!local.equals(sentFields.topicPartitions)) { Review Comment: Don't we need to also take the assignment epoch into consideration here? In other words, should we store the LocalAssignment in `sentFields` and use it to do the comparison? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on PR #15511: URL: https://github.com/apache/kafka/pull/15511#issuecomment-1988762078 @lianetm @dajac Could you please have a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org