lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1525940513


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -911,9 +911,13 @@ void maybeReconcile() {
         SortedSet<TopicIdPartition> assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
         final LocalAssignmentImpl resolvedAssignment = new 
LocalAssignmentImpl(currentTargetAssignment.localEpoch, 
assignedTopicIdPartitions);
 
-        if (resolvedAssignment.equals(currentAssignment)) {
-            log.debug("Ignoring reconciliation attempt. Target assignment 
ready to reconcile {} " +
-                    "is equal to the member current assignment.", 
resolvedAssignment);
+        if (currentAssignment != LocalAssignmentImpl.NONE &&
+            resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 
&&
+            
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
+            log.debug("Ignoring reconciliation attempt. The resolvable 
fragment of the target assignment {} " +

Review Comment:
   I didn't want to talk about the local epoch here, since it's more of 
implementation detail how to detect intermediate assignments. But then I should 
log the local epoch I supposed. Updated it accordingly.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -1028,9 +1028,9 @@ public void 
testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() {
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.poll(time.milliseconds());
+        membershipManager.onHeartbeatRequestSent();

Review Comment:
   Done



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -2279,22 +2277,23 @@ private MembershipManagerImpl 
mockJoinAndReceiveAssignment(boolean expectSubscri
         return mockJoinAndReceiveAssignment(expectSubscriptionUpdated, 
createAssignment(expectSubscriptionUpdated));
     }
 
-    private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean 
expectSubscriptionUpdated,
+    private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean 
triggerReconciliation,

Review Comment:
   Done



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -976,6 +974,10 @@ void maybeReconcile() {
             }
 
             revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, 
revokedPartitions, addedPartitions);
+        }).whenComplete((__, error) -> {
+            if (error != null) {
+                log.error("Reconciliation failed.", error);

Review Comment:
   Nope, the exception handling inside `revokeAndAssign` is only triggered if 
the revoke and assign future fails. If `revokeAndAssign` fails outside the 
future (in particular, inside `revokePartitions`, there is logic that may 
fail), the exception falls through to here and was silently swallowed before, 
which cost me an hour of debugging to find.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to