lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1559430603
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -229,7 +229,11 @@ private GroupRebalanceConfig buildRebalanceConfig(Optional<String> groupInstance @AfterEach public void teardown() { this.metrics.close(); - this.coordinator.close(time.timer(0)); + try { + this.coordinator.close(time.timer(0)); Review Comment: correct. it was just less likely before. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1360,6 +1362,9 @@ public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration Timer requestTimer = time.timer(timeout.toMillis()); SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, requestTimer); CompletableFuture<Void> commitFuture = commit(syncCommitEvent); + + awaitPendingAsyncCommits(requestTimer, false); Review Comment: Done ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { Review Comment: Done ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { + // This is testing the contract that asynchronous offset commit are completed before the consumer + // is closed. + val producer = createProducer() + sendRecords(producer, numRecords = 3, tp) + sendRecords(producer, numRecords = 3, tp2) + + val consumer = createConsumer() + consumer.assign(List(tp, tp2).asJava) + + // Try without looking up the coordinator first + val cb = new CountConsumerCommitCallback + consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) + consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) + consumer.close() + assertEquals(2, cb.successCount); + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { + // This is testing the contract that asynchronous offset commits sent previously with the + // `commitAsync` are guaranteed to have their callbacks invoked prior to completion of + // `commitSync` (given that it does not time out). + val producer = createProducer() + sendRecords(producer, numRecords = 3, tp) + sendRecords(producer, numRecords = 3, tp2) + + val consumer = createConsumer() + consumer.assign(List(tp, tp2).asJava) + + // Try without looking up the coordinator first + val cb = new CountConsumerCommitCallback + consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) + consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) + assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(1, cb.successCount); + + // Enforce looking up the coordinator + consumer.committed(Set(tp, tp2).asJava) + + // Try with coordinator known + consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava, cb) + consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(2L))).asJava) + assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset) + assertEquals(2, cb.successCount); + + // Try with empty sync commit + consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava, cb) + consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) + assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset) + assertEquals(3, cb.successCount); Review Comment: Done ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { + // This is testing the contract that asynchronous offset commit are completed before the consumer + // is closed. + val producer = createProducer() + sendRecords(producer, numRecords = 3, tp) + sendRecords(producer, numRecords = 3, tp2) + + val consumer = createConsumer() + consumer.assign(List(tp, tp2).asJava) + + // Try without looking up the coordinator first + val cb = new CountConsumerCommitCallback + consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) + consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) + consumer.close() + assertEquals(2, cb.successCount); + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { + // This is testing the contract that asynchronous offset commits sent previously with the + // `commitAsync` are guaranteed to have their callbacks invoked prior to completion of + // `commitSync` (given that it does not time out). + val producer = createProducer() + sendRecords(producer, numRecords = 3, tp) + sendRecords(producer, numRecords = 3, tp2) + + val consumer = createConsumer() + consumer.assign(List(tp, tp2).asJava) + + // Try without looking up the coordinator first + val cb = new CountConsumerCommitCallback + consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) + consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) + assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(1, cb.successCount); + + // Enforce looking up the coordinator + consumer.committed(Set(tp, tp2).asJava) Review Comment: True, it's not required. I added this to unconfuse people (because that `assertEquals` already has the effect is subtle). But instead I confused you, so let me remove it ;) ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { + // This is testing the contract that asynchronous offset commit are completed before the consumer + // is closed. + val producer = createProducer() + sendRecords(producer, numRecords = 3, tp) + sendRecords(producer, numRecords = 3, tp2) + + val consumer = createConsumer() + consumer.assign(List(tp, tp2).asJava) + + // Try without looking up the coordinator first + val cb = new CountConsumerCommitCallback + consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) + consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) + consumer.close() + assertEquals(2, cb.successCount); + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { + // This is testing the contract that asynchronous offset commits sent previously with the + // `commitAsync` are guaranteed to have their callbacks invoked prior to completion of + // `commitSync` (given that it does not time out). + val producer = createProducer() + sendRecords(producer, numRecords = 3, tp) + sendRecords(producer, numRecords = 3, tp2) + + val consumer = createConsumer() + consumer.assign(List(tp, tp2).asJava) + + // Try without looking up the coordinator first + val cb = new CountConsumerCommitCallback + consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) + consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) + assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(1, cb.successCount); + + // Enforce looking up the coordinator + consumer.committed(Set(tp, tp2).asJava) + + // Try with coordinator known + consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava, cb) + consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(2L))).asJava) + assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset) + assertEquals(2, cb.successCount); Review Comment: Done ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) { } private boolean invokePendingAsyncCommits(Timer timer) { - if (inFlightAsyncCommits.get() == 0) { + if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { Review Comment: 100% ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { + // This is testing the contract that asynchronous offset commit are completed before the consumer + // is closed. + val producer = createProducer() + sendRecords(producer, numRecords = 3, tp) + sendRecords(producer, numRecords = 3, tp2) + + val consumer = createConsumer() + consumer.assign(List(tp, tp2).asJava) + + // Try without looking up the coordinator first + val cb = new CountConsumerCommitCallback + consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) + consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) + consumer.close() + assertEquals(2, cb.successCount); Review Comment: Done ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -984,6 +984,8 @@ public void close(final Timer timer) { } } finally { super.close(timer); + // Super-class close may wait for more commit callbacks to complete. + invokeCompletedOffsetCommitCallbacks(); Review Comment: That timer will be expired, right? So when we wait inside finally, we will not block again in `awaitPendingRequests` in the super-class. ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { + // This is testing the contract that asynchronous offset commit are completed before the consumer + // is closed. Review Comment: Agreed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org