Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-18 Thread via GitHub


lucasbru merged PR #15511:
URL: https://github.com/apache/kafka/pull/15511


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lianetm commented on PR #15511:
URL: https://github.com/apache/kafka/pull/15511#issuecomment-2000323516

   Went over the latests updates, LGTM. Thanks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1526716168


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -565,6 +565,58 @@ public void testSameAssignmentReconciledAgainWhenFenced() {
 assertEquals(toTopicIdPartitionMap(assignment1), 
membershipManager.currentAssignment().partitions);
 }
 
+/**
+ * This is the case where we receive a new assignment while reconciling an 
existing one. The intermediate assignment
+ * is not applied, and a new assignment containing the same partitions is 
received and reconciled. In all assignments,
+ * one topic is not resolvable.
+ *
+ * We need to make sure that the last assignment is acked and applied, 
even though the set of partitions does not change.
+ * In this case, no rebalance listeners are run.
+ */
+@Test
+public void testSameAssignmentReconciledAgainWithMissingTopic() {
+MembershipManagerImpl membershipManager = createMemberInStableState();
+Uuid topic1 = Uuid.randomUuid();
+Uuid topic2 = Uuid.randomUuid();
+final Assignment assignment1 = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+.setTopicPartitions(Arrays.asList(
+new 
TopicPartitions().setTopicId(topic1).setPartitions(Collections.singletonList(0)),
+new 
TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0))
+));
+final Assignment assignment2 = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+.setTopicPartitions(Arrays.asList(
+new 
TopicPartitions().setTopicId(topic1).setPartitions(Arrays.asList(0, 1)),
+new 
TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0))
+));
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topic1, 
"topic1"));
+
+// Receive assignment - full reconciliation triggered
+// stay in RECONCILING state, since an unresolved topic is assigned
+
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1).data());
+assertEquals(MemberState.RECONCILING, membershipManager.state());
+membershipManager.poll(time.milliseconds());
+verifyReconciliationTriggeredAndCompleted(membershipManager,
+Collections.singletonList(new TopicIdPartition(topic1, new 
TopicPartition("topic1", 0)))
+);
+membershipManager.onHeartbeatRequestSent();
+assertEquals(MemberState.RECONCILING, membershipManager.state());
+clearInvocations(membershipManager);
+
+// Receive extended assignment - assignment received but no 
reconciliation triggered
+
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2).data());
+assertEquals(MemberState.RECONCILING, membershipManager.state());
+verifyReconciliationNotTriggered(membershipManager);

Review Comment:
   Exactly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1526715547


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -565,6 +565,58 @@ public void testSameAssignmentReconciledAgainWhenFenced() {
 assertEquals(toTopicIdPartitionMap(assignment1), 
membershipManager.currentAssignment().partitions);
 }
 
+/**
+ * This is the case where we receive a new assignment while reconciling an 
existing one. The intermediate assignment
+ * is not applied, and a new assignment containing the same partitions is 
received and reconciled. In all assignments,
+ * one topic is not resolvable.
+ *
+ * We need to make sure that the last assignment is acked and applied, 
even though the set of partitions does not change.
+ * In this case, no rebalance listeners are run.
+ */
+@Test
+public void testSameAssignmentReconciledAgainWithMissingTopic() {
+MembershipManagerImpl membershipManager = createMemberInStableState();
+Uuid topic1 = Uuid.randomUuid();
+Uuid topic2 = Uuid.randomUuid();
+final Assignment assignment1 = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+.setTopicPartitions(Arrays.asList(
+new 
TopicPartitions().setTopicId(topic1).setPartitions(Collections.singletonList(0)),
+new 
TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0))
+));
+final Assignment assignment2 = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+.setTopicPartitions(Arrays.asList(
+new 
TopicPartitions().setTopicId(topic1).setPartitions(Arrays.asList(0, 1)),
+new 
TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0))
+));
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topic1, 
"topic1"));
+
+// Receive assignment - full reconciliation triggered
+// stay in RECONCILING state, since an unresolved topic is assigned
+
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1).data());
+assertEquals(MemberState.RECONCILING, membershipManager.state());
+membershipManager.poll(time.milliseconds());
+verifyReconciliationTriggeredAndCompleted(membershipManager,
+Collections.singletonList(new TopicIdPartition(topic1, new 
TopicPartition("topic1", 0)))
+);
+membershipManager.onHeartbeatRequestSent();
+assertEquals(MemberState.RECONCILING, membershipManager.state());
+clearInvocations(membershipManager);
+
+// Receive extended assignment - assignment received but no 
reconciliation triggered
+
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2).data());
+assertEquals(MemberState.RECONCILING, membershipManager.state());
+verifyReconciliationNotTriggered(membershipManager);

Review Comment:
   Just to double check, it is intentional here that you're not calling poll 
right? (that's why a reconciliation is not triggered here, otherwise I would 
expect we do trigger a reconciliation for t1-1)...I guess this is what you 
refer to as "intermediate" assignment (an assignment received and removed 
before a call to poll)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1526581796


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -889,42 +894,39 @@ private void transitionToStale() {
  */
 void maybeReconcile() {
 if (targetAssignmentReconciled()) {
-log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+log.trace("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
 "current assignment.");
 return;
 }
 if (reconciliationInProgress) {
-log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
+log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
 currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
 return;
 }
 
 // Find the subset of the target assignment that can be resolved to 
topic names, and trigger a metadata update
 // if some topic IDs are not resolvable.
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
+final LocalAssignment resolvedAssignment = new 
LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
+
+if (!currentAssignment.isNone() &&
+
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {

Review Comment:
   Done



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -889,42 +894,39 @@ private void transitionToStale() {
  */
 void maybeReconcile() {
 if (targetAssignmentReconciled()) {
-log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+log.trace("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
 "current assignment.");
 return;
 }
 if (reconciliationInProgress) {
-log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
+log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
 currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
 return;
 }
 
 // Find the subset of the target assignment that can be resolved to 
topic names, and trigger a metadata update
 // if some topic IDs are not resolvable.
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
+final LocalAssignment resolvedAssignment = new 
LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
+
+if (!currentAssignment.isNone() &&
+
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
+log.debug("There are unresolved partitions, and the resolvable 
fragment of the  target assignment {} is equal to the current "

Review Comment:
   Done



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -970,7 +973,11 @@ void maybeReconcile() {
 log.debug("Auto-commit before reconciling new assignment 
completed successfully.");
 }
 
-revokeAndAssign(assignedTopicIdPartitions, revokedPartitions, 
addedPartitions);
+revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, 
revokedPartitions, addedPartitions);
+}).whenComplete((__, error) -> {

Review Comment:
   Done - but it requires a `return null` :/



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -76,15 +83,14 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class HeartbeatRequestManagerTest {
-private long retryBackoffMs = DEFAULT_RETRY_BACKOFF_MS;
-private int heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
-private int maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL_MS;
-private long retryBackoffMaxMs = DEFAULT_RETRY_BACKOFF_MAX_MS;
+private final long retryBackoffMs = DEFAULT_RETRY_BACKOFF_MS;
+private final int heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
+private final int maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL_MS;
+private final long retryBackoffMaxMs = DEFAULT_RETRY_BACKOFF_MAX_MS;

Review Comment:
   Done



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ 

Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lianetm commented on PR #15511:
URL: https://github.com/apache/kafka/pull/15511#issuecomment-2000134008

   Thanks for the updates @lucasbru, took another look, LGTM considering the 
latests nits. 
   
   Just for the record, I will follow-up with 
[KAFKA-16375](https://issues.apache.org/jira/browse/) for how ongoing 
reconciliations are discarded after rejoining.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1526609264


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -29,6 +29,8 @@
 import org.apache.kafka.common.TopicPartition;

Review Comment:
   Agree it's not covered. (Happy to take that separately myself, right after 
this if it helps @lucasbru...knowing you're getting busy very soon) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1526609264


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -29,6 +29,8 @@
 import org.apache.kafka.common.TopicPartition;

Review Comment:
   Agree it's not covered. (Happy to take that separately right after this if 
it helps @lucasbru...knowing you're getting busy very soon) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1526609264


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -29,6 +29,8 @@
 import org.apache.kafka.common.TopicPartition;

Review Comment:
   Agree it's not covered. (Happy to take that separately right after this if 
you prefer @lucasbru...knowing you're getting busy very soon) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1526605829


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -976,6 +974,10 @@ void maybeReconcile() {
 }
 
 revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, 
revokedPartitions, addedPartitions);
+}).whenComplete((__, error) -> {
+if (error != null) {
+log.error("Reconciliation failed.", error);

Review Comment:
   Oh I see now, thanks for the explanation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1526605292


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -911,9 +911,13 @@ void maybeReconcile() {
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
 final LocalAssignmentImpl resolvedAssignment = new 
LocalAssignmentImpl(currentTargetAssignment.localEpoch, 
assignedTopicIdPartitions);
 
-if (resolvedAssignment.equals(currentAssignment)) {
-log.debug("Ignoring reconciliation attempt. Target assignment 
ready to reconcile {} " +
-"is equal to the member current assignment.", 
resolvedAssignment);
+if (currentAssignment != LocalAssignmentImpl.NONE &&
+resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 
&&
+
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
+log.debug("Ignoring reconciliation attempt. The resolvable 
fragment of the target assignment {} " +

Review Comment:
   Agree in the need to update the assignment. I was exactly pushing for doing 
only only what's needed (vs doing all that the reconciliation does), so this 
sounds good to me. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


dajac commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1526420387


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -970,7 +973,11 @@ void maybeReconcile() {
 log.debug("Auto-commit before reconciling new assignment 
completed successfully.");
 }
 
-revokeAndAssign(assignedTopicIdPartitions, revokedPartitions, 
addedPartitions);
+revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, 
revokedPartitions, addedPartitions);
+}).whenComplete((__, error) -> {

Review Comment:
   nit: Could we use `exceptionally`?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -552,26 +553,23 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 
 // ServerAssignor - only sent if has changed since the last 
heartbeat
 this.membershipManager.serverAssignor().ifPresent(serverAssignor 
-> {
-if (!serverAssignor.equals(sentFields.serverAssignor)) {
+if (membershipManager.memberEpoch() == 0 || 
!serverAssignor.equals(sentFields.serverAssignor)) {
 data.setServerAssignor(serverAssignor);
 sentFields.serverAssignor = serverAssignor;
 }
 });
 
 // ClientAssignors - not supported yet
 
-// TopicPartitions - only sent if it has changed since the last 
heartbeat. Note that
-// the string consists of just the topic ID and the partitions. 
When an assignment is
-// received, we might not yet know the topic name, and then it is 
learnt subsequently
-// by a metadata update.
-TreeSet assignedPartitions = 
membershipManager.currentAssignment().entrySet().stream()
-.map(entry -> entry.getKey() + "-" + entry.getValue())
-.collect(Collectors.toCollection(TreeSet::new));
-if (!assignedPartitions.equals(sentFields.topicPartitions)) {
+// TopicPartitions - sent with the first heartbeat after a new 
assignment from the server was
+// reconciled. This is ensured by resending the topic partitions 
whenever the local assignment,
+// including its local epoch is changed (although the local epoch 
is not sent in the heartbeat).
+LocalAssignment local = membershipManager.currentAssignment();
+if (!local.equals(sentFields.localAssignment)) {

Review Comment:
   Do we need `membershipManager.memberEpoch() == 0` here too? I suppose that 
it works because the current assignment is reset in the membership manager but 
it may be better to add it here for consistency.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -29,6 +29,8 @@
 import org.apache.kafka.common.TopicPartition;

Review Comment:
   I wonder if we could add the example that we discussed offline as a test:
   ```
   0: [T1, T2] -- T2 unresolved (only T1 is reconciled)
   1: [T1, T2, T3] -- T2 unresolved (skipped, since reconciliation in progress)
   2: [T1, T2] -- T2 unresolved
   ```
   What do you think?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -185,4 +190,89 @@ public interface MembershipManager extends RequestManager {
  * releasing its assignment. This is expected to be used when the poll 
timer is reset.
  */
 void maybeRejoinStaleMember();
+
+/**
+ * A data structure to represent the current assignment, and current 
target assignment of a member in a consumer group.
+ *
+ * Besides the assigned partitions, it contains a local epoch that is 
bumped whenever the assignment changes, to ensure
+ * that two assignments with the same partitions but different local 
epochs are not considered equal.
+ */
+final class LocalAssignment {
+
+public static final long NONE_EPOCH = -1;
+
+public static final LocalAssignment NONE = new 
LocalAssignment(NONE_EPOCH, Collections.emptyMap());
+
+public final long localEpoch;
+
+public final Map> partitions;
+
+public LocalAssignment(long localEpoch, Map> 
partitions) {
+this.localEpoch = localEpoch;
+this.partitions = partitions;
+if (localEpoch == NONE_EPOCH && !partitions.isEmpty()) {
+throw new IllegalArgumentException("Local epoch must be set if 
there are partitions");
+}
+}
+
+public LocalAssignment(long localEpoch, SortedSet 
topicIdPartitions) {
+this.localEpoch = localEpoch;
+this.partitions = new HashMap<>();
+if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) {
+throw new IllegalArgumentException("Local epoch must be 

Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1526349331


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
-membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
-data.setInstanceId(groupInstanceId);
-sentFields.instanceId = groupInstanceId;
-}
-});
+// InstanceId - always send when leaving the group as a static 
member
+membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining
+if (membershipManager.memberEpoch() == 0) {

Review Comment:
   Note that this was indepndently also implemented in 
https://github.com/apache/kafka/pull/15401 which I had seen but forgotten 
about. So I will rebase and epoch == 0 conditions will come from the other PR 
(in the form of an equivalent state == JOINING condition).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1526346873


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -911,9 +911,13 @@ void maybeReconcile() {
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
 final LocalAssignmentImpl resolvedAssignment = new 
LocalAssignmentImpl(currentTargetAssignment.localEpoch, 
assignedTopicIdPartitions);
 
-if (resolvedAssignment.equals(currentAssignment)) {
-log.debug("Ignoring reconciliation attempt. Target assignment 
ready to reconcile {} " +
-"is equal to the member current assignment.", 
resolvedAssignment);
+if (currentAssignment != LocalAssignmentImpl.NONE &&
+resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 
&&
+
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
+log.debug("Ignoring reconciliation attempt. The resolvable 
fragment of the target assignment {} " +

Review Comment:
   @dajac and I discussed various options to implement the short-cutting. In 
the end, the outcome was to only skip calling the callbacks but not skipping 
the entire reconciliation if the resolved assignment is equal to the current 
assignment but with a different epoch. We still need to update the current 
assignment with the resolved assignment and the new epoch in order to trigger 
the "ack", and transition to sending ack as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1525940513


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -911,9 +911,13 @@ void maybeReconcile() {
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
 final LocalAssignmentImpl resolvedAssignment = new 
LocalAssignmentImpl(currentTargetAssignment.localEpoch, 
assignedTopicIdPartitions);
 
-if (resolvedAssignment.equals(currentAssignment)) {
-log.debug("Ignoring reconciliation attempt. Target assignment 
ready to reconcile {} " +
-"is equal to the member current assignment.", 
resolvedAssignment);
+if (currentAssignment != LocalAssignmentImpl.NONE &&
+resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 
&&
+
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
+log.debug("Ignoring reconciliation attempt. The resolvable 
fragment of the target assignment {} " +

Review Comment:
   I didn't want to talk about the local epoch here, since it's more of 
implementation detail how to detect intermediate assignments. But then I should 
log the local epoch I supposed. Updated it accordingly.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -1028,9 +1028,9 @@ public void 
testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() {
 
 verifyReconciliationNotTriggered(membershipManager);
 membershipManager.poll(time.milliseconds());
+membershipManager.onHeartbeatRequestSent();

Review Comment:
   Done



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2279,22 +2277,23 @@ private MembershipManagerImpl 
mockJoinAndReceiveAssignment(boolean expectSubscri
 return mockJoinAndReceiveAssignment(expectSubscriptionUpdated, 
createAssignment(expectSubscriptionUpdated));
 }
 
-private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean 
expectSubscriptionUpdated,
+private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean 
triggerReconciliation,

Review Comment:
   Done



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -976,6 +974,10 @@ void maybeReconcile() {
 }
 
 revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, 
revokedPartitions, addedPartitions);
+}).whenComplete((__, error) -> {
+if (error != null) {
+log.error("Reconciliation failed.", error);

Review Comment:
   Nope, the exception handling inside `revokeAndAssign` is only triggered if 
the revoke and assign future fails. If `revokeAndAssign` fails outside the 
future (in particular, inside `revokePartitions`, there is logic that may 
fail), the exception falls through to here and was silently swallowed before, 
which cost me an hour of debugging to find.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1524592939


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -185,4 +185,18 @@ public interface MembershipManager extends RequestManager {
  * releasing its assignment. This is expected to be used when the poll 
timer is reset.
  */
 void maybeRejoinStaleMember();
+
+/**
+ * A data structure to represent the current assignment, and current 
target assignment of a member in a consumer group.
+ *
+ * Besides the assigned partitions, it contains a local epoch that is 
bumped whenever the assignment changes, to ensure
+ * that two assignments with the same partitions but different local 
epochs are not considered equal.

Review Comment:
   Done (removed the interface)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-15 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1525912448


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
-membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
-data.setInstanceId(groupInstanceId);
-sentFields.instanceId = groupInstanceId;
-}
-});
+// InstanceId - always send when leaving the group as a static 
member
+membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining
+if (membershipManager.memberEpoch() == 0) {

Review Comment:
   Summary of slack discussion: We need to resend a full request after a 
failure, to let the GC know that it needs to send a full response (including 
repeating the assignment). The reason is that if you had a request timeout for 
a heartbeat request, that heartbeat could have been the one with a new 
assignment for you. If you don't send a full request next, the server will 
assume that you've receive that last one so it won't deliver the new assignment 
again. So sending a full request is an implicit "force full response". 
   
   I reverted that part of the changes (so I brought back the rebalanceMs and 
serverAssignor in sentFields). We could consider only using reset on the 
heartbeat state (resetting when transitioning to JOINING may be enough). 
However, that seems more brittle, it’s quite easy to introduce a code path 
where we forget to reset the heartbeat state. Having a contract like “always 
send when epoch==0” for the joining case is easier in my eyes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-14 Thread via GitHub


lianetm commented on PR #15511:
URL: https://github.com/apache/kafka/pull/15511#issuecomment-1998471082

   Thanks for the updates @lucasbru, left some minor comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-14 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1525452774


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -976,6 +974,10 @@ void maybeReconcile() {
 }
 
 revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, 
revokedPartitions, addedPartitions);
+}).whenComplete((__, error) -> {
+if (error != null) {
+log.error("Reconciliation failed.", error);

Review Comment:
   Is there a reason why we want this log here? We already have the same but 
inside the `revokeAndAssign`, when reconciliation completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-14 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1525446624


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -1028,9 +1028,9 @@ public void 
testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() {
 
 verifyReconciliationNotTriggered(membershipManager);
 membershipManager.poll(time.milliseconds());
+membershipManager.onHeartbeatRequestSent();

Review Comment:
   I would suggest we add the check that a reconciliation was triggered here, 
just adding 
   `verifyReconciliationTriggeredAndCompleted(membershipManager, 
Collections.emptyList());` right after poll. It's part of what this PR is 
introducing and it completes the pic of what's happening when getting the first 
(empty) assignment that can be reconciled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-14 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1525418611


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2279,22 +2277,23 @@ private MembershipManagerImpl 
mockJoinAndReceiveAssignment(boolean expectSubscri
 return mockJoinAndReceiveAssignment(expectSubscriptionUpdated, 
createAssignment(expectSubscriptionUpdated));
 }
 
-private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean 
expectSubscriptionUpdated,
+private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean 
triggerReconciliation,

Review Comment:
   Agree with the renamed param, but could we update it also in the same 
overloaded method above, just for consistency



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-14 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1525386165


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -911,9 +911,13 @@ void maybeReconcile() {
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
 final LocalAssignmentImpl resolvedAssignment = new 
LocalAssignmentImpl(currentTargetAssignment.localEpoch, 
assignedTopicIdPartitions);
 
-if (resolvedAssignment.equals(currentAssignment)) {
-log.debug("Ignoring reconciliation attempt. Target assignment 
ready to reconcile {} " +
-"is equal to the member current assignment.", 
resolvedAssignment);
+if (currentAssignment != LocalAssignmentImpl.NONE &&
+resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 
&&
+
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
+log.debug("Ignoring reconciliation attempt. The resolvable 
fragment of the target assignment {} " +

Review Comment:
   just to consider maybe simplifying the log and clarify the situation: isn't 
the message here simply that we're ignoring the reconciliation because resolved 
target is equals to the current assignment? I get the point about intermediate 
assignments, but an intermediate one would have updated the current assignment 
so it wouldn't be equals to the resolved target (or leave a reconciliation in 
progress so it wouldn't even make it to this check)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-14 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1525360383


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -988,7 +989,8 @@ long getExpirationTimeForTimeout(final long timeoutMs) {
  * then complete the reconciliation by updating the assignment and making 
the appropriate state
  * transition. Note that if any of the 2 callbacks fails, the 
reconciliation should fail.
  */
-private void revokeAndAssign(SortedSet 
assignedTopicIdPartitions,
+private void revokeAndAssign(LocalAssignmentImpl resolvedAssignment,

Review Comment:
   It was this same one 
[KAFKA-16185](https://issues.apache.org/jira/browse/KAFKA-16185) thought for 
handling the situation around discarding reconciliations (which is what that 
rejoin check is for). But I had missed it in the review too. I just created 
[KAFKA-16375](https://issues.apache.org/jira/browse/KAFKA-16375) to handle it 
in a follow-up PR (probably moving away of all epochs to cover that edge case, 
and identifying the rejoin simply by the member going through a transition to 
join). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-14 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1524575611


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1497,4 +1502,93 @@ public PollResult poll(final long currentTimeMs) {
 List stateListeners() {
 return unmodifiableList(stateUpdatesListeners);
 }
+
+private final static class LocalAssignmentImpl implements LocalAssignment {
+
+private static final long NONE_EPOCH = -1;
+
+private static final LocalAssignmentImpl NONE = new 
LocalAssignmentImpl(NONE_EPOCH, Collections.emptyMap());
+
+private final long localEpoch;
+
+private final Map> partitions;
+
+public LocalAssignmentImpl(long localEpoch, Map> partitions) {
+this.localEpoch = localEpoch;
+this.partitions = partitions;
+if (localEpoch == NONE_EPOCH && !partitions.isEmpty()) {
+throw new IllegalArgumentException("Local epoch must be set if 
there are partitions");
+}
+}
+
+public LocalAssignmentImpl(long localEpoch, 
SortedSet topicIdPartitions) {
+this.localEpoch = localEpoch;
+this.partitions = new HashMap<>();
+if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) {
+throw new IllegalArgumentException("Local epoch must be set if 
there are partitions");
+}
+topicIdPartitions.forEach(topicIdPartition -> {
+Uuid topicId = topicIdPartition.topicId();
+partitions.computeIfAbsent(topicId, k -> new 
TreeSet<>()).add(topicIdPartition.partition());
+});
+}
+
+@Override
+public String toString() {
+return "{" +
+"localEpoch=" + localEpoch +
+", partitions=" + partitions +
+'}';
+}
+
+@Override
+public boolean equals(final Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+final LocalAssignmentImpl that = (LocalAssignmentImpl) o;
+return localEpoch == that.localEpoch && Objects.equals(partitions, 
that.partitions);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(localEpoch, partitions);
+}
+
+@Override
+public Map> getPartitions() {
+return partitions;
+}
+
+@Override
+public boolean isNone() {
+return localEpoch == NONE_EPOCH;
+}
+
+Optional 
updateWith(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
+
+// Return if we have an assignment, and it is the same as current 
assignment; comparison without creating a new collection
+if (localEpoch != NONE_EPOCH) {
+// check if the new assignment is different from the current 
target assignment

Review Comment:
   Done



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
-membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
-data.setInstanceId(groupInstanceId);
-sentFields.instanceId = groupInstanceId;
-}
-});
+// InstanceId - always send when leaving the group as a static 
member
+membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining
+if (membershipManager.memberEpoch() == 0) {

Review Comment:
   Done



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -1952,19 +1942,22 @@ private void 
assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm
 // Should reset epoch to leave the group and release the assignment 
(right away because
 // there is no onPartitionsLost callback defined)
 verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
-assertTrue(membershipManager.currentAssignment().isEmpty());
+ 

Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-14 Thread via GitHub


dajac commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1524608939


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
-membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
-data.setInstanceId(groupInstanceId);
-sentFields.instanceId = groupInstanceId;
-}
-});
+// InstanceId - always send when leaving the group as a static 
member
+membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining
+if (membershipManager.memberEpoch() == 0) {

Review Comment:
   I had a second look at all this logic and I actually wonder if it is better 
to keep the previous logic for rebalanceTimeoutMs and serverAssignor. My 
concern is that we should actually send out a full request when `reset` has 
been called and with the change it is not the case anymore. The main was the 
send a full request when joining or on any errors (e.g. request timeout).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-14 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1524569366


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -530,19 +530,12 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
-membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
-data.setInstanceId(groupInstanceId);
-sentFields.instanceId = groupInstanceId;
-}
-});
+// InstanceId - always send when leaving the group as a static 
member
+membershipManager.groupInstanceId().ifPresent(data::setInstanceId);

Review Comment:
   Ah, I didn't see that one, so the change will go away with a merge



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-14 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1524568814


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -889,43 +914,36 @@ private void transitionToStale() {
  */
 void maybeReconcile() {
 if (targetAssignmentReconciled()) {
-log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+log.trace("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
 "current assignment.");
 return;
 }
 if (reconciliationInProgress) {
-log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
+log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
 currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
 return;
 }
 
 // Find the subset of the target assignment that can be resolved to 
topic names, and trigger a metadata update
 // if some topic IDs are not resolvable.
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
+final LocalAssignment resolvedAssignment = new 
LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
 
-SortedSet ownedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
-ownedPartitions.addAll(subscriptions.assignedPartitions());
-
-// Keep copy of assigned TopicPartitions created from the 
TopicIdPartitions that are
-// being reconciled. Needed for interactions with the centralized 
subscription state that
-// does not support topic IDs yet, and for the callbacks.
-SortedSet assignedTopicPartitions = 
toTopicPartitionSet(assignedTopicIdPartitions);
-
-// Check same assignment. Based on topic names for now, until topic 
IDs are properly
-// supported in the centralized subscription state object. Note that 
this check is
-// required to make sure that reconciliation is not triggered if the 
assignment ready to
-// be reconciled is the same as the current one (even though the 
member may remain
-// in RECONCILING state if it has some unresolved assignments).
-boolean sameAssignmentReceived = 
assignedTopicPartitions.equals(ownedPartitions);
-
-if (sameAssignmentReceived) {
+if (resolvedAssignment.equals(currentAssignment)) {

Review Comment:
   Okay. I will short-cut the reconciliation if the resolvable assignment did 
not change and if the local target epoch either did not change or was bumped 
once (the latter being your example). In the latter case, I'll still bump the 
local current epoch.
   
   It's a bit more special casing than I was hoping for, but I see that if 
these "delayed topic names" are common, I'd be annoying to get two 
reconciliations every time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-14 Thread via GitHub


dajac commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1524430731


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
-membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
-data.setInstanceId(groupInstanceId);
-sentFields.instanceId = groupInstanceId;
-}
-});
+// InstanceId - always send when leaving the group as a static 
member
+membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining
+if (membershipManager.memberEpoch() == 0) {

Review Comment:
   While we are here, we may be able to do the same for `serverAssignor` field 
as it never changes during the runtime.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -185,4 +185,18 @@ public interface MembershipManager extends RequestManager {
  * releasing its assignment. This is expected to be used when the poll 
timer is reset.
  */
 void maybeRejoinStaleMember();
+
+/**
+ * A data structure to represent the current assignment, and current 
target assignment of a member in a consumer group.
+ *
+ * Besides the assigned partitions, it contains a local epoch that is 
bumped whenever the assignment changes, to ensure
+ * that two assignments with the same partitions but different local 
epochs are not considered equal.
+ */
+interface LocalAssignment {
+
+Map> getPartitions();

Review Comment:
   nit: We usually don't prefix getters with `get`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -566,18 +560,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 
 // ClientAssignors - not supported yet
 
-// TopicPartitions - only sent if it has changed since the last 
heartbeat. Note that
-// the string consists of just the topic ID and the partitions. 
When an assignment is
-// received, we might not yet know the topic name, and then it is 
learnt subsequently
-// by a metadata update.
-TreeSet assignedPartitions = 
membershipManager.currentAssignment().entrySet().stream()
-.map(entry -> entry.getKey() + "-" + entry.getValue())
-.collect(Collectors.toCollection(TreeSet::new));
-if (!assignedPartitions.equals(sentFields.topicPartitions)) {
+// TopicPartitions - sent with the first heartbeat after a new 
assignment from the server was
+// reconciled. This is ensured by resending the topic partitions 
whenever the local assignment,
+// including its local epoch is changed (although the local epoch 
is not sent in the heartbeat).
+LocalAssignment local = membershipManager.currentAssignment();
+if (local == null) {

Review Comment:
   nit: We could use `isNone`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -988,7 +989,8 @@ long getExpirationTimeForTimeout(final long timeoutMs) {
  * then complete the reconciliation by updating the assignment and making 
the appropriate state
  * transition. Note that if any of the 2 callbacks fails, the 
reconciliation should fail.
  */
-private void revokeAndAssign(SortedSet 
assignedTopicIdPartitions,
+private void revokeAndAssign(LocalAssignmentImpl resolvedAssignment,

Review Comment:
   btw, I just noticed the `boolean memberHasRejoined = 
memberEpochOnReconciliationStart != memberEpoch` condition while reading the 
code again. We have it in two places. I think that this is wrong because the 
member epoch could effectively change without leaving. @lianetm I recall that 
we discussed this a while ago. Do we have a jira to address this?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -1952,19 +1942,22 @@ private void 
assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm
 // Should 

Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on PR #15511:
URL: https://github.com/apache/kafka/pull/15511#issuecomment-1995835817

   hey @lucasbru, I took another full pass, left a few comments. Thanks for the 
changes!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523858796


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -889,43 +914,36 @@ private void transitionToStale() {
  */
 void maybeReconcile() {
 if (targetAssignmentReconciled()) {
-log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+log.trace("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
 "current assignment.");
 return;
 }
 if (reconciliationInProgress) {
-log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
+log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
 currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
 return;
 }
 
 // Find the subset of the target assignment that can be resolved to 
topic names, and trigger a metadata update
 // if some topic IDs are not resolvable.
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
+final LocalAssignment resolvedAssignment = new 
LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
 
-SortedSet ownedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
-ownedPartitions.addAll(subscriptions.assignedPartitions());
-
-// Keep copy of assigned TopicPartitions created from the 
TopicIdPartitions that are
-// being reconciled. Needed for interactions with the centralized 
subscription state that
-// does not support topic IDs yet, and for the callbacks.
-SortedSet assignedTopicPartitions = 
toTopicPartitionSet(assignedTopicIdPartitions);
-
-// Check same assignment. Based on topic names for now, until topic 
IDs are properly
-// supported in the centralized subscription state object. Note that 
this check is
-// required to make sure that reconciliation is not triggered if the 
assignment ready to
-// be reconciled is the same as the current one (even though the 
member may remain
-// in RECONCILING state if it has some unresolved assignments).
-boolean sameAssignmentReceived = 
assignedTopicPartitions.equals(ownedPartitions);
-
-if (sameAssignmentReceived) {
+if (resolvedAssignment.equals(currentAssignment)) {

Review Comment:
   well I was expecting we only need to trigger the callbacks if the assignment 
changed (could be to empty, but something needs to change), and that's not the 
case if the member ends up with t1-1 again, that it already owns. 
   
   By running a full reconciliation when the the resolved assignment is the 
same as the current but received later, we end up with a client reconciling the 
exact same assignment it already owns :S It would turn out noisy I expect, 
accounting for 2 rebalances in cases probably much more common, where a new 
topic assigned is temporarily not in metadata and then discovered: member owns 
[t1-1], receives assignment [t1-1, t2-1] with missing metadata for t2 (t2 
discovered shortly after). Wouldn't we generate 2 rebalances??? (a 1st one with 
no changes in assignment, a 2nd one with the added topic once discovered) when 
truly things only changed once)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523903844


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -1952,19 +1942,22 @@ private void 
assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm
 // Should reset epoch to leave the group and release the assignment 
(right away because
 // there is no onPartitionsLost callback defined)
 verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
-assertTrue(membershipManager.currentAssignment().isEmpty());
+assertTrue(membershipManager.currentAssignment().isNone());
 assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
 assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
membershipManager.memberEpoch());
 }
 
 @Test
 public void 
testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() {

Review Comment:
   I cannot find any test for one of the 2 issues we're after with this PR: 
ensure that we are triggering the callbacks when joining and getting empty 
assignment. Could we add it? I expect it should be very similar to this one, 
but just providing a `CounterConsumerRebalanceListener` and asserting that it 
called the `onPartitionsAssigned`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523885245


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
-membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
-data.setInstanceId(groupInstanceId);
-sentFields.instanceId = groupInstanceId;
-}
-});
+// InstanceId - always send when leaving the group as a static 
member
+membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining
+if (membershipManager.memberEpoch() == 0) {
 data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
-sentFields.rebalanceTimeoutMs = rebalanceTimeoutMs;
 }
 
 if (!this.subscriptions.hasPatternSubscription()) {
-// SubscribedTopicNames - only sent if has changed since the 
last heartbeat
+// SubscribedTopicNames - only sent when joining or if has 
changed since the last heartbeat

Review Comment:
   inherited, but now that we're here: "or if **it** has changed..."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523863939


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -683,7 +681,10 @@ public void testDelayedMetadataUsedToCompleteAssignment() {
 receiveAssignment(newAssignment, membershipManager);
 membershipManager.poll(time.milliseconds());
 
-verifyReconciliationNotTriggered(membershipManager);
+// We bumped the local epoch, so new reconciliation is triggered

Review Comment:
   This new reconciliation triggered is an example of what I was referring to 
in the [comment](https://github.com/apache/kafka/pull/15511/files#r1523858796) 
above, that seems to complicate the flow with 2 rebalances instead of 1. I 
wouldn't expect a member rebalancing/reconciling at this point (no assignment 
change for him yet, t2 not in metadata), and then reconciling/rebalance again 
once it discovers it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523880045


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -508,9 +508,30 @@ private void 
processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign
  */
 private void replaceTargetAssignmentWithNewAssignment(
 ConsumerGroupHeartbeatResponseData.Assignment assignment) {
-currentTargetAssignment.clear();
+
+// Return if the same as current assignment; comparison without 
creating a new collection
+if (currentTargetAssignment != null) {
+// check if the new assignment is different from the current 
target assignment
+if (currentTargetAssignment.partitions.size() == 
assignment.topicPartitions().size() &&
+assignment.topicPartitions().stream().allMatch(
+tp -> 
currentTargetAssignment.partitions.containsKey(tp.topicId()) &&
+
currentTargetAssignment.partitions.get(tp.topicId()).size() == 
tp.partitions().size() &&
+
currentTargetAssignment.partitions.get(tp.topicId()).containsAll(tp.partitions(
 {
+return;
+}
+}
+
+// Bump local epoch and replace assignment
+long nextLocalEpoch;
+if (currentTargetAssignment == null) {
+nextLocalEpoch = 0;
+} else {
+nextLocalEpoch = currentTargetAssignment.localEpoch + 1;
+}

Review Comment:
   I was wrong thinking that we could reuse the server epoch here, but it's not 
truly representing new assignments for the member, and we could end up 
receiving multiple new assignments in the same server epoch. So ok with this 
local epoch approach, best way to keep track of "versions" of the assignment 
received. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523863939


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -683,7 +681,10 @@ public void testDelayedMetadataUsedToCompleteAssignment() {
 receiveAssignment(newAssignment, membershipManager);
 membershipManager.poll(time.milliseconds());
 
-verifyReconciliationNotTriggered(membershipManager);
+// We bumped the local epoch, so new reconciliation is triggered

Review Comment:
   This new reconciliation triggered is an example of what I was referring to 
in the [comment](https://github.com/apache/kafka/pull/15511/files#r1523858796) 
above, that seems to complicate the flow with 2 rebalances instead of 1. I 
wouldn't expect a member rebalancing/reconciling at this point (nothing changed 
for him yet, t2 not in metadata), and then reconciling/rebalance again once it 
discovers it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523858796


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -889,43 +914,36 @@ private void transitionToStale() {
  */
 void maybeReconcile() {
 if (targetAssignmentReconciled()) {
-log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+log.trace("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
 "current assignment.");
 return;
 }
 if (reconciliationInProgress) {
-log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
+log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
 currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
 return;
 }
 
 // Find the subset of the target assignment that can be resolved to 
topic names, and trigger a metadata update
 // if some topic IDs are not resolvable.
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
+final LocalAssignment resolvedAssignment = new 
LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
 
-SortedSet ownedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
-ownedPartitions.addAll(subscriptions.assignedPartitions());
-
-// Keep copy of assigned TopicPartitions created from the 
TopicIdPartitions that are
-// being reconciled. Needed for interactions with the centralized 
subscription state that
-// does not support topic IDs yet, and for the callbacks.
-SortedSet assignedTopicPartitions = 
toTopicPartitionSet(assignedTopicIdPartitions);
-
-// Check same assignment. Based on topic names for now, until topic 
IDs are properly
-// supported in the centralized subscription state object. Note that 
this check is
-// required to make sure that reconciliation is not triggered if the 
assignment ready to
-// be reconciled is the same as the current one (even though the 
member may remain
-// in RECONCILING state if it has some unresolved assignments).
-boolean sameAssignmentReceived = 
assignedTopicPartitions.equals(ownedPartitions);
-
-if (sameAssignmentReceived) {
+if (resolvedAssignment.equals(currentAssignment)) {

Review Comment:
   well I was expecting we only need to trigger the callbacks if the assignment 
changed (could be to empty, but something needs to change), and that's not the 
case if the member ends up with t1-1 again, that it already owns. 
   
   By running a full reconciliation when the the resolved assignment is the 
same as the current but received later, we end up with a client reconciling the 
exact same assignment it already owns :S It would turn out noisy I expect, 
accounting for 2 rebalances in cases probably much more common, where a new 
topic assigned is temporarily not in metadata and then discovered: member owns 
[t1-1], receives assignment [t1-1, t2-1] with missing metadata for t2 (t2 
discovered shortly after). We would generate 2 rebalances (a 1st one with no 
changes in assignment, a 2nd one with the added topic once discovered) when 
truly things only changed once). Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523734711


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1497,4 +1502,93 @@ public PollResult poll(final long currentTimeMs) {
 List stateListeners() {
 return unmodifiableList(stateUpdatesListeners);
 }
+
+private final static class LocalAssignmentImpl implements LocalAssignment {
+
+private static final long NONE_EPOCH = -1;
+
+private static final LocalAssignmentImpl NONE = new 
LocalAssignmentImpl(NONE_EPOCH, Collections.emptyMap());
+
+private final long localEpoch;
+
+private final Map> partitions;
+
+public LocalAssignmentImpl(long localEpoch, Map> partitions) {
+this.localEpoch = localEpoch;
+this.partitions = partitions;
+if (localEpoch == NONE_EPOCH && !partitions.isEmpty()) {
+throw new IllegalArgumentException("Local epoch must be set if 
there are partitions");
+}
+}
+
+public LocalAssignmentImpl(long localEpoch, 
SortedSet topicIdPartitions) {
+this.localEpoch = localEpoch;
+this.partitions = new HashMap<>();
+if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) {
+throw new IllegalArgumentException("Local epoch must be set if 
there are partitions");
+}
+topicIdPartitions.forEach(topicIdPartition -> {
+Uuid topicId = topicIdPartition.topicId();
+partitions.computeIfAbsent(topicId, k -> new 
TreeSet<>()).add(topicIdPartition.partition());
+});
+}
+
+@Override
+public String toString() {
+return "{" +
+"localEpoch=" + localEpoch +
+", partitions=" + partitions +
+'}';
+}
+
+@Override
+public boolean equals(final Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+final LocalAssignmentImpl that = (LocalAssignmentImpl) o;
+return localEpoch == that.localEpoch && Objects.equals(partitions, 
that.partitions);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(localEpoch, partitions);
+}
+
+@Override
+public Map> getPartitions() {
+return partitions;
+}
+
+@Override
+public boolean isNone() {
+return localEpoch == NONE_EPOCH;
+}
+
+Optional 
updateWith(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
+
+// Return if we have an assignment, and it is the same as current 
assignment; comparison without creating a new collection
+if (localEpoch != NONE_EPOCH) {
+// check if the new assignment is different from the current 
target assignment

Review Comment:
   nit: here we are checking if the new assignment is "the same as" the current 
(not diff)even maybe just remove this comment, as the one above seems 
enough? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523709384


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -530,19 +530,12 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
-membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
-data.setInstanceId(groupInstanceId);
-sentFields.instanceId = groupInstanceId;
-}
-});
+// InstanceId - always send when leaving the group as a static 
member
+membershipManager.groupInstanceId().ifPresent(data::setInstanceId);

Review Comment:
   yes, that's the current expectation, so looks good. Let's just update the 
comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523523053


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 
 // ClientAssignors - not supported yet
 
-// TopicPartitions - only sent if it has changed since the last 
heartbeat. Note that
-// the string consists of just the topic ID and the partitions. 
When an assignment is
-// received, we might not yet know the topic name, and then it is 
learnt subsequently
-// by a metadata update.
-TreeSet assignedPartitions = 
membershipManager.currentAssignment().entrySet().stream()
-.map(entry -> entry.getKey() + "-" + entry.getValue())
-.collect(Collectors.toCollection(TreeSet::new));
-if (!assignedPartitions.equals(sentFields.topicPartitions)) {
+// TopicPartitions - sent with the first heartbeat after a new 
assignment from the server was
+// reconciled. This is ensured by resending the topic partitions 
whenever the local assignment,
+// including its local epoch is changed (although the local epoch 
is not sent in the heartbeat).
+LocalAssignment local = membershipManager.currentAssignment();
+if (local == null) {
+data.setTopicPartitions(Collections.emptyList());
+sentFields.topicPartitions = null;
+} else if (!local.equals(sentFields.topicPartitions)) {

Review Comment:
   Renamed the field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523522542


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 }
 });
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining or if has changed 
since the last heartbeat
+if (membershipManager.memberEpoch() == 0 || 
sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {

Review Comment:
   Done



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -508,9 +508,30 @@ private void 
processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign
  */
 private void replaceTargetAssignmentWithNewAssignment(
 ConsumerGroupHeartbeatResponseData.Assignment assignment) {
-currentTargetAssignment.clear();
+

Review Comment:
   Implemented both changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523522030


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -210,12 +213,12 @@ public class MembershipManagerImpl implements 
MembershipManager {
 private final Map assignedTopicNamesCache;
 
 /**
- * Topic IDs and partitions received in the last target assignment. Items 
are added to this set
- * every time a target assignment is received. This is where the member 
collects the assignment
- * received from the broker, even though it may not be ready to fully 
reconcile due to missing
- * metadata.
+ * Topic IDs and partitions received in the last target assignment, 
together with its local epoch.
+ *
+ * This member reassigned every time a new assignment is received.

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523520659


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -530,19 +530,12 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
-membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
-data.setInstanceId(groupInstanceId);
-sentFields.instanceId = groupInstanceId;
-}
-});
+// InstanceId - always send when leaving the group as a static 
member
+membershipManager.groupInstanceId().ifPresent(data::setInstanceId);

Review Comment:
   I updated instance ID to be send it all the time when present. I think the 
last information was that GC relies on this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1522964906


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -508,9 +508,30 @@ private void 
processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign
  */
 private void replaceTargetAssignmentWithNewAssignment(
 ConsumerGroupHeartbeatResponseData.Assignment assignment) {
-currentTargetAssignment.clear();
+
+// Return if the same as current assignment; comparison without 
creating a new collection
+if (currentTargetAssignment != null) {
+// check if the new assignment is different from the current 
target assignment
+if (currentTargetAssignment.partitions.size() == 
assignment.topicPartitions().size() &&
+assignment.topicPartitions().stream().allMatch(
+tp -> 
currentTargetAssignment.partitions.containsKey(tp.topicId()) &&
+
currentTargetAssignment.partitions.get(tp.topicId()).size() == 
tp.partitions().size() &&
+
currentTargetAssignment.partitions.get(tp.topicId()).containsAll(tp.partitions(
 {
+return;
+}
+}
+
+// Bump local epoch and replace assignment
+long nextLocalEpoch;
+if (currentTargetAssignment == null) {
+nextLocalEpoch = 0;
+} else {
+nextLocalEpoch = currentTargetAssignment.localEpoch + 1;
+}

Review Comment:
   I think the server is bumping the target member epoch when any assignment 
for any member changes, while the local epoch is only bumped if the assignment 
for this specific member changes. So we should get fewer reconciliations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1522901846


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -889,43 +914,36 @@ private void transitionToStale() {
  */
 void maybeReconcile() {
 if (targetAssignmentReconciled()) {
-log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+log.trace("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
 "current assignment.");
 return;
 }
 if (reconciliationInProgress) {
-log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
+log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
 currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
 return;
 }
 
 // Find the subset of the target assignment that can be resolved to 
topic names, and trigger a metadata update
 // if some topic IDs are not resolvable.
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
+final LocalAssignment resolvedAssignment = new 
LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
 
-SortedSet ownedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
-ownedPartitions.addAll(subscriptions.assignedPartitions());
-
-// Keep copy of assigned TopicPartitions created from the 
TopicIdPartitions that are
-// being reconciled. Needed for interactions with the centralized 
subscription state that
-// does not support topic IDs yet, and for the callbacks.
-SortedSet assignedTopicPartitions = 
toTopicPartitionSet(assignedTopicIdPartitions);
-
-// Check same assignment. Based on topic names for now, until topic 
IDs are properly
-// supported in the centralized subscription state object. Note that 
this check is
-// required to make sure that reconciliation is not triggered if the 
assignment ready to
-// be reconciled is the same as the current one (even though the 
member may remain
-// in RECONCILING state if it has some unresolved assignments).
-boolean sameAssignmentReceived = 
assignedTopicPartitions.equals(ownedPartitions);
-
-if (sameAssignmentReceived) {
+if (resolvedAssignment.equals(currentAssignment)) {

Review Comment:
   I guess we could do it, but do we have to, and is it the best thing we 
should do? It seems like a corner case to me, and maybe the easiest and 
cleanest behavior is to just run a full reconciliation, because the assignment 
changed in the meantime, even if our client never managed to reconcile the 
intermediate assignment. It seems users are using the ConsumerRebalanceListener 
to detect rebalances (as do our system tests). So the case you are describing, 
sounds like a rebalance event to me, so I think we should call the listener. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-12 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521937262


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -210,12 +213,12 @@ public class MembershipManagerImpl implements 
MembershipManager {
 private final Map assignedTopicNamesCache;
 
 /**
- * Topic IDs and partitions received in the last target assignment. Items 
are added to this set
- * every time a target assignment is received. This is where the member 
collects the assignment
- * received from the broker, even though it may not be ready to fully 
reconcile due to missing
- * metadata.
+ * Topic IDs and partitions received in the last target assignment, 
together with its local epoch.
+ *
+ * This member reassigned every time a new assignment is received.

Review Comment:
   I don't quite get this sentence (was the intention just to start with 
`Reassigned every`... I guess...?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-12 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521933499


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -889,43 +914,36 @@ private void transitionToStale() {
  */
 void maybeReconcile() {
 if (targetAssignmentReconciled()) {
-log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+log.trace("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
 "current assignment.");
 return;
 }
 if (reconciliationInProgress) {
-log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
+log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
 currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
 return;
 }
 
 // Find the subset of the target assignment that can be resolved to 
topic names, and trigger a metadata update
 // if some topic IDs are not resolvable.
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
+final LocalAssignment resolvedAssignment = new 
LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
 
-SortedSet ownedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
-ownedPartitions.addAll(subscriptions.assignedPartitions());
-
-// Keep copy of assigned TopicPartitions created from the 
TopicIdPartitions that are
-// being reconciled. Needed for interactions with the centralized 
subscription state that
-// does not support topic IDs yet, and for the callbacks.
-SortedSet assignedTopicPartitions = 
toTopicPartitionSet(assignedTopicIdPartitions);
-
-// Check same assignment. Based on topic names for now, until topic 
IDs are properly
-// supported in the centralized subscription state object. Note that 
this check is
-// required to make sure that reconciliation is not triggered if the 
assignment ready to
-// be reconciled is the same as the current one (even though the 
member may remain
-// in RECONCILING state if it has some unresolved assignments).
-boolean sameAssignmentReceived = 
assignedTopicPartitions.equals(ownedPartitions);
-
-if (sameAssignmentReceived) {
+if (resolvedAssignment.equals(currentAssignment)) {

Review Comment:
   This is short-circuiting the reconciliation if the same assignment is 
received (same epoch, same partitions). But we also need to consider the case 
that we get the same partitions assigned but in a different epoch. In that 
case, we should not carry on with the full reconciliation process (there is 
truly nothing to reconcile), but we should send an ack to the broker, so I 
would expect we need a similar short-circuit for `if sameAssignmentInDiffEpoch 
=> transitionToAck & return;`.
   
   It's mainly thinking about the case:
   - member owns t1-1 epoch 3
   - receives new assignment [t1-1, t2-1] epoch 4, stuck reconciling, ex. 
missing t2 metadata
   - receives new assignment [t1-1] epoch 5 (ex. broker realized t2 has been 
deleted)
   - member does not need to reconcile t1-1, but should send an ack to the 
broker with t1-1 that it received on a newer epoch
   Makes sense? Not sure if I may be missing something regarding the 
expectations
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-12 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521933499


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -889,43 +914,36 @@ private void transitionToStale() {
  */
 void maybeReconcile() {
 if (targetAssignmentReconciled()) {
-log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+log.trace("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
 "current assignment.");
 return;
 }
 if (reconciliationInProgress) {
-log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
+log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
 currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
 return;
 }
 
 // Find the subset of the target assignment that can be resolved to 
topic names, and trigger a metadata update
 // if some topic IDs are not resolvable.
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
+final LocalAssignment resolvedAssignment = new 
LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
 
-SortedSet ownedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
-ownedPartitions.addAll(subscriptions.assignedPartitions());
-
-// Keep copy of assigned TopicPartitions created from the 
TopicIdPartitions that are
-// being reconciled. Needed for interactions with the centralized 
subscription state that
-// does not support topic IDs yet, and for the callbacks.
-SortedSet assignedTopicPartitions = 
toTopicPartitionSet(assignedTopicIdPartitions);
-
-// Check same assignment. Based on topic names for now, until topic 
IDs are properly
-// supported in the centralized subscription state object. Note that 
this check is
-// required to make sure that reconciliation is not triggered if the 
assignment ready to
-// be reconciled is the same as the current one (even though the 
member may remain
-// in RECONCILING state if it has some unresolved assignments).
-boolean sameAssignmentReceived = 
assignedTopicPartitions.equals(ownedPartitions);
-
-if (sameAssignmentReceived) {
+if (resolvedAssignment.equals(currentAssignment)) {

Review Comment:
   This is short-circuiting the reconciliation if the same assignment is 
received (same epoch, same partitions). But we also need to consider also the 
case that we get the same partitions assigned but in a different epoch. In that 
case, we should not carry on with the full reconciliation process (there is 
truly nothing to reconcile), but we should send an ack to the broker, so I 
would expect we need a similar short-circuit for `if sameAssignmentInDiffEpoch 
=> transitionToAck & return;`.
   
   It's mainly thinking about the case:
   - member owns t1-1 epoch 3
   - receives new assignment [t1-1, t2-1] epoch 4, stuck reconciling, ex. 
missing t2 metadata
   - receives new assignment [t1-1] epoch 5 (ex. broker realized t2 has been 
deleted)
   - member does not need to reconcile t1-1, but should send an ack to the 
broker with t1-1 that it received on a newer epoch
   Makes sense? Not sure if I may be missing something regarding the 
expectations
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-12 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521750747


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -508,9 +508,30 @@ private void 
processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign
  */
 private void replaceTargetAssignmentWithNewAssignment(
 ConsumerGroupHeartbeatResponseData.Assignment assignment) {
-currentTargetAssignment.clear();
+

Review Comment:
   1. I did not do it, because the `LocalAssignment` is leaked from this file 
via the `currentAssignment` method, and I didn't necessarily want to put so 
much logic in the public interface `MembershipManager`. However, I think I 
could possibly define the return value of `currentAssignment` by a light 
interface, and then put the fat class with all the updating logic in here. I'll 
give it a try. 
   2. I wouldn't necessarily call it `EMPTY` (to avoid confusion with an empty 
assignment), but rather `NONE` or something, but other than that it sounds like 
a good idea.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 }
 });
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining or if has changed 
since the last heartbeat
+if (membershipManager.memberEpoch() == 0 || 
sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {

Review Comment:
   true, this can be simplified



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 
 // ClientAssignors - not supported yet
 
-// TopicPartitions - only sent if it has changed since the last 
heartbeat. Note that
-// the string consists of just the topic ID and the partitions. 
When an assignment is
-// received, we might not yet know the topic name, and then it is 
learnt subsequently
-// by a metadata update.
-TreeSet assignedPartitions = 
membershipManager.currentAssignment().entrySet().stream()
-.map(entry -> entry.getKey() + "-" + entry.getValue())
-.collect(Collectors.toCollection(TreeSet::new));
-if (!assignedPartitions.equals(sentFields.topicPartitions)) {
+// TopicPartitions - sent with the first heartbeat after a new 
assignment from the server was
+// reconciled. This is ensured by resending the topic partitions 
whenever the local assignment,
+// including its local epoch is changed (although the local epoch 
is not sent in the heartbeat).
+LocalAssignment local = membershipManager.currentAssignment();
+if (local == null) {
+data.setTopicPartitions(Collections.emptyList());
+sentFields.topicPartitions = null;
+} else if (!local.equals(sentFields.topicPartitions)) {

Review Comment:
   That's what is being done. Maybe I should have renamed the field 
`sentFields.topicPartitions` to `sentFields.localAssignment`, but the type is 
now `LocalAssignment` and the `equals` compares the local epoch as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-12 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521745679


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -508,9 +508,30 @@ private void 
processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign
  */
 private void replaceTargetAssignmentWithNewAssignment(
 ConsumerGroupHeartbeatResponseData.Assignment assignment) {
-currentTargetAssignment.clear();
+
+// Return if the same as current assignment; comparison without 
creating a new collection
+if (currentTargetAssignment != null) {
+// check if the new assignment is different from the current 
target assignment
+if (currentTargetAssignment.partitions.size() == 
assignment.topicPartitions().size() &&
+assignment.topicPartitions().stream().allMatch(
+tp -> 
currentTargetAssignment.partitions.containsKey(tp.topicId()) &&
+
currentTargetAssignment.partitions.get(tp.topicId()).size() == 
tp.partitions().size() &&
+
currentTargetAssignment.partitions.get(tp.topicId()).containsAll(tp.partitions(
 {
+return;
+}
+}
+
+// Bump local epoch and replace assignment
+long nextLocalEpoch;
+if (currentTargetAssignment == null) {
+nextLocalEpoch = 0;
+} else {
+nextLocalEpoch = currentTargetAssignment.localEpoch + 1;
+}

Review Comment:
   Is there a reason why we need to compute epochs on the client here? The 
server is the one bumping the epochs whenever it computes a new assignment for 
a member. I was expecting that we just keep a `LocalAssignment` that will 
contain the partitions and epoch the broker sent on the 
`ConsumerGroupHeartbeatResponseData`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-12 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521632809


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 }
 });
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining or if has changed 
since the last heartbeat
+if (membershipManager.memberEpoch() == 0 || 
sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {

Review Comment:
   it is static, taken from the `max.poll.interval.ms`, so agree we could set 
it only when epoch == 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-12 Thread via GitHub


dajac commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521552751


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 }
 });
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining or if has changed 
since the last heartbeat
+if (membershipManager.memberEpoch() == 0 || 
sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {

Review Comment:
   If not mistaken, the `rebalanceTimeoutMs` timeout is a static config so we 
could actually set it only in when joining the group (when epoch == 0).



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -508,9 +508,30 @@ private void 
processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign
  */
 private void replaceTargetAssignmentWithNewAssignment(
 ConsumerGroupHeartbeatResponseData.Assignment assignment) {
-currentTargetAssignment.clear();
+

Review Comment:
   I have to high level thoughts but I am not sure whether they are worth it:
   1) Have you considered moving all the update logic into `LocalAssignment`? 
We could have a method such a `updateWith(Assignment)` which returns an 
`Optional` containing the new assignment if it was updated.
   2) On a similar line, I wonder if we could have an `EMPTY` constant for the 
default `LocalAssignment(-1, null)` instead of relying on `null`. The reasoning 
of this one is that it avoids having to deal with `null` in a few places in 
this file.
   What do you think?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 
 // ClientAssignors - not supported yet
 
-// TopicPartitions - only sent if it has changed since the last 
heartbeat. Note that
-// the string consists of just the topic ID and the partitions. 
When an assignment is
-// received, we might not yet know the topic name, and then it is 
learnt subsequently
-// by a metadata update.
-TreeSet assignedPartitions = 
membershipManager.currentAssignment().entrySet().stream()
-.map(entry -> entry.getKey() + "-" + entry.getValue())
-.collect(Collectors.toCollection(TreeSet::new));
-if (!assignedPartitions.equals(sentFields.topicPartitions)) {
+// TopicPartitions - sent with the first heartbeat after a new 
assignment from the server was
+// reconciled. This is ensured by resending the topic partitions 
whenever the local assignment,
+// including its local epoch is changed (although the local epoch 
is not sent in the heartbeat).
+LocalAssignment local = membershipManager.currentAssignment();
+if (local == null) {
+data.setTopicPartitions(Collections.emptyList());
+sentFields.topicPartitions = null;
+} else if (!local.equals(sentFields.topicPartitions)) {

Review Comment:
   Don't we need to also take the assignment epoch into consideration here? In 
other words, should we store the LocalAssignment in `sentFields` and use it to 
do the comparison?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-11 Thread via GitHub


lucasbru commented on PR #15511:
URL: https://github.com/apache/kafka/pull/15511#issuecomment-1988762078

   @lianetm @dajac Could you please have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org