lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1952918176
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -818,6 +826,8 @@ void maybeReconcile() {
return;
}
+ if (autoCommitEnabled && !canCommit) return;
Review Comment:
I think we can improve this a bit more: if there are no partitions to
revoke, we could carry on with this reconciliation really, meaning no delay
reconciling newly added partitions (reconciled from the background poll as
before, no need to wait for the app poll).
So, I expect we just need to move this check (along with the
markReconciliationInProg) to right before the log.info("Reconciling assignment
with local epoch...")? and then we could check:
```
if (autoCommitEnabled && !revokedPartitions.isEmpty() && !canCommit) return;
```
would that work?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1301,7 +1299,7 @@ public void
testUnresolvedTargetAssignmentIsReconciledWhenMetadataReceived() {
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -881,7 +879,7 @@ public void testDelayedMetadataUsedToCompleteAssignment() {
);
when(metadata.topicNames()).thenReturn(fullTopicMetadata);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -819,8 +818,7 @@ public void
testDelayedReconciliationResultAppliedWhenTargetChangedWithNewAssign
assertEquals(MemberState.RECONCILING, membershipManager.state());
clearInvocations(membershipManager, commitRequestManager);
- // Next poll should trigger final reconciliation
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
Review Comment:
same here, if this test has no revocations (seems so?)
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1249,7 +1247,7 @@ public void
testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() {
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -2226,7 +2223,7 @@ public void
testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -2655,7 +2652,7 @@ private ConsumerMembershipManager
createMemberInStableState(String groupInstance
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
assertEquals(MemberState.RECONCILING, membershipManager.state());
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1403,7 +1401,7 @@ public void
testReconciliationSkippedWhenSameAssignmentReceived() {
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
Review Comment:
ditto? (not sure)
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -663,7 +663,7 @@ public void
testSameAssignmentReconciledAgainWithMissingTopic() {
// stay in RECONCILING state, since an unresolved topic is assigned
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1,
membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
Review Comment:
helpful to see this passing with `maybeReconcile(false)` here, since this
test doesn't seem to have any revocations right? (would be good test coverage
to ensure we still reconcile in the same way, called from background poll, when
it's only about adding partitions)
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1594,7 +1591,7 @@ public void
testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
Review Comment:
I expect this one should be reconciled asap (param false), and the next one
below, ln 1618, would need to wait to commit (param true)
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1363,7 +1361,7 @@ public void
testReconcileNewPartitionsAssignedWhenNoPartitionOwned() {
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1207,7 +1205,7 @@ public void
testNewAssignmentReplacesPreviousOneWaitingOnMetadata() {
receiveAssignment(topicId, Collections.singletonList(0),
membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1546,8 +1544,7 @@ public void
testMetadataUpdatesReconcilesUnresolvedAssignments() {
String topicName = "topic1";
mockTopicNameInMetadataCache(Collections.singletonMap(topicId,
topicName), true);
- // When the next poll is run, the member should re-trigger
reconciliation
- membershipManager.poll(time.milliseconds());
+ membershipManager.maybeReconcile(true);
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]