lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1559430603


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -229,7 +229,11 @@ private GroupRebalanceConfig 
buildRebalanceConfig(Optional<String> groupInstance
     @AfterEach
     public void teardown() {
         this.metrics.close();
-        this.coordinator.close(time.timer(0));
+        try {
+            this.coordinator.close(time.timer(0));

Review Comment:
   correct. it was just less likely before.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1360,6 +1362,9 @@ public void commitSync(Map<TopicPartition, 
OffsetAndMetadata> offsets, Duration
             Timer requestTimer = time.timer(timeout.toMillis());
             SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, 
requestTimer);
             CompletableFuture<Void> commitFuture = commit(syncCommitEvent);
+
+            awaitPendingAsyncCommits(requestTimer, false);

Review Comment:
   Done



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   Done



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+    // This is testing the contract that asynchronous offset commit are 
completed before the consumer
+    // is closed.
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 3, tp)
+    sendRecords(producer, numRecords = 3, tp2)
+
+    val consumer = createConsumer()
+    consumer.assign(List(tp, tp2).asJava)
+
+    // Try without looking up the coordinator first
+    val cb = new CountConsumerCommitCallback
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+    consumer.close()
+    assertEquals(2, cb.successCount);
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+    // This is testing the contract that asynchronous offset commits sent 
previously with the
+    // `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+    // `commitSync` (given that it does not time out).
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 3, tp)
+    sendRecords(producer, numRecords = 3, tp2)
+
+    val consumer = createConsumer()
+    consumer.assign(List(tp, tp2).asJava)
+
+    // Try without looking up the coordinator first
+    val cb = new CountConsumerCommitCallback
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+    consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+    assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+    assertEquals(1, cb.successCount);
+
+    // Enforce looking up the coordinator
+    consumer.committed(Set(tp, tp2).asJava)
+
+    // Try with coordinator known
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(2L))).asJava, cb)
+    consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(2L))).asJava)
+    assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset)
+    assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+    assertEquals(2, cb.successCount);
+
+    // Try with empty sync commit
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(3L))).asJava, cb)
+    consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+    assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
+    assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+    assertEquals(3, cb.successCount);

Review Comment:
   Done



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+    // This is testing the contract that asynchronous offset commit are 
completed before the consumer
+    // is closed.
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 3, tp)
+    sendRecords(producer, numRecords = 3, tp2)
+
+    val consumer = createConsumer()
+    consumer.assign(List(tp, tp2).asJava)
+
+    // Try without looking up the coordinator first
+    val cb = new CountConsumerCommitCallback
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+    consumer.close()
+    assertEquals(2, cb.successCount);
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+    // This is testing the contract that asynchronous offset commits sent 
previously with the
+    // `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+    // `commitSync` (given that it does not time out).
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 3, tp)
+    sendRecords(producer, numRecords = 3, tp2)
+
+    val consumer = createConsumer()
+    consumer.assign(List(tp, tp2).asJava)
+
+    // Try without looking up the coordinator first
+    val cb = new CountConsumerCommitCallback
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+    consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+    assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+    assertEquals(1, cb.successCount);
+
+    // Enforce looking up the coordinator
+    consumer.committed(Set(tp, tp2).asJava)

Review Comment:
   True, it's not required. I added this to unconfuse people (because that 
`assertEquals` already has the effect is subtle). But instead I confused you, 
so let me remove it ;)



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+    // This is testing the contract that asynchronous offset commit are 
completed before the consumer
+    // is closed.
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 3, tp)
+    sendRecords(producer, numRecords = 3, tp2)
+
+    val consumer = createConsumer()
+    consumer.assign(List(tp, tp2).asJava)
+
+    // Try without looking up the coordinator first
+    val cb = new CountConsumerCommitCallback
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+    consumer.close()
+    assertEquals(2, cb.successCount);
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+    // This is testing the contract that asynchronous offset commits sent 
previously with the
+    // `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+    // `commitSync` (given that it does not time out).
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 3, tp)
+    sendRecords(producer, numRecords = 3, tp2)
+
+    val consumer = createConsumer()
+    consumer.assign(List(tp, tp2).asJava)
+
+    // Try without looking up the coordinator first
+    val cb = new CountConsumerCommitCallback
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+    consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+    assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+    assertEquals(1, cb.successCount);
+
+    // Enforce looking up the coordinator
+    consumer.committed(Set(tp, tp2).asJava)
+
+    // Try with coordinator known
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(2L))).asJava, cb)
+    consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(2L))).asJava)
+    assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset)
+    assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+    assertEquals(2, cb.successCount);

Review Comment:
   Done



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) {
     }
 
     private boolean invokePendingAsyncCommits(Timer timer) {
-        if (inFlightAsyncCommits.get() == 0) {
+        if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) 
{

Review Comment:
   100%



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+    // This is testing the contract that asynchronous offset commit are 
completed before the consumer
+    // is closed.
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 3, tp)
+    sendRecords(producer, numRecords = 3, tp2)
+
+    val consumer = createConsumer()
+    consumer.assign(List(tp, tp2).asJava)
+
+    // Try without looking up the coordinator first
+    val cb = new CountConsumerCommitCallback
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+    consumer.close()
+    assertEquals(2, cb.successCount);

Review Comment:
   Done



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -984,6 +984,8 @@ public void close(final Timer timer) {
             }
         } finally {
             super.close(timer);
+            // Super-class close may wait for more commit callbacks to 
complete.
+            invokeCompletedOffsetCommitCallbacks();

Review Comment:
   That timer will be expired, right? So when we wait inside finally, we will 
not block again in `awaitPendingRequests` in the super-class.



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+    // This is testing the contract that asynchronous offset commit are 
completed before the consumer
+    // is closed.

Review Comment:
   Agreed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to