Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-22 Thread via GitHub


lucasbru merged PR #15613:
URL: https://github.com/apache/kafka/pull/15613


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-19 Thread via GitHub


lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1572360676


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
+// disabled, or simply because there no consumed offsets).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount)
+  }
+
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first

Review Comment:
   Okay got it. Yeah, the state "async commit pending because I do not know the 
coordinator yet" and the state "async commit is already sent to the 
coordinator" may cover two different cases. In fact, in the legacy consumer, 
these are two independent states the async commit can be in. In the async 
consumer as well, although waiting for the async commit to complete works 
independently of that state (since the different states are handled by the 
background thread, and we simplify wait for the future in the foreground 
thread). 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +634,85 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+final TopicPartition tp = new TopicPartition("foo", 0);
+testIncompleteAsyncCommit(tp);
+
+final CompletableFuture asyncCommitFuture = 
getLastEnqueuedEventFuture();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+// Complete exceptionally async commit event
+asyncCommitFuture.completeExceptionally(new KafkaException("Test 
exception"));
+
+// Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), 
Duration.ofMillis(100)));
+}
+
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() 
{
+final TopicPartition tp = new TopicPartition("foo", 0);
+testIncompleteAsyncCommit(tp);
+
+final CompletableFuture asyncCommitFuture = 
getLastEnqueuedEventFuture();
+
+// Mock to complete sync event
+completeCommitSyncApplicationEventSuccessfully();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 

Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-19 Thread via GitHub


cadonna commented on PR #15613:
URL: https://github.com/apache/kafka/pull/15613#issuecomment-2066478507

   > > @lucasbru Thanks for the PR!
   > > The unit tests you added fail in the build and also for me locally.
   > > Plus, I have a question regarding the integration tests.
   > 
   > @cadonna Thanks for the review, I hadn't noticed the test failures. Seems 
the `ArgumentCaptor`s type-matching introduced with Mockito 5 does not work on 
Java 8. I committed a workaround. Also, I'm both shocked and impressed that you 
are using Java 8 locally.
   
   It has to run at least on Java 8, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-19 Thread via GitHub


cadonna commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1572290394


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
+// disabled, or simply because there no consumed offsets).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount)
+  }
+
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first

Review Comment:
   Fair enough, but why is that important? Is the intention that the async 
commit needs to lookup the group coordinator?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1005,6 +1102,43 @@ public void testNoWakeupInCloseCommit() {
 assertFalse(capturedEvent.get().future().isCompletedExceptionally());
 }
 
+@Test
+public void testCloseAwaitPendingAsyncCommitIncomplete() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));

Review Comment:
   Do we need this stub?



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
+// disabled, or simply because there no consumed offsets).

Review Comment:
   ```suggestion
   // disabled, or simply because there are no consumed offsets).
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +634,85 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+final TopicPartition tp = new TopicPartition("foo", 0);
+testIncompleteAsyncCommit(tp);
+
+final CompletableFuture asyncCommitFuture 

Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-19 Thread via GitHub


lucasbru commented on PR #15613:
URL: https://github.com/apache/kafka/pull/15613#issuecomment-2066392315

   > @lucasbru Thanks for the PR!
   > 
   > The unit tests you added fail in the build and also for me locally.
   > 
   > Plus, I have a question regarding the integration tests.
   
   @cadonna Thanks for the review, I hadn't noticed the test failures. Seems 
the `ArgumentCaptor`s type-matching introduced with Mockito 5 does not work on 
Java 8. I committed a workaround. Also, I'm both shocked and impressed that you 
are using Java 8 locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-19 Thread via GitHub


lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1572109503


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
+// disabled, or simply because there no consumed offsets).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount)
+  }
+
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(1, cb.successCount)
+
+// Try with coordinator known

Review Comment:
   We ar guaranteed to have looked up the coordinator here, because we did 
request data from it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-19 Thread via GitHub


lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1572109078


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
+// disabled, or simply because there no consumed offsets).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount)
+  }
+
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first

Review Comment:
   We are not guaranteed to have looked up the coordinator here, because we did 
not request any data from it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-18 Thread via GitHub


cadonna commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1570932239


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
+// disabled, or simply because there no consumed offsets).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount)
+  }
+
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first

Review Comment:
   I do not understand those comments. How do I recognize in the code that the 
coordinator is not looked up first? And why is that so important?



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
+// disabled, or simply because there no consumed offsets).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount)
+  }
+
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val 

Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-17 Thread via GitHub


kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1569228945


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+// Complete async commit event
+final ArgumentCaptor commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+verify(applicationEventHandler).add(commitEventCaptor.capture());

Review Comment:
   Interesting. Good to know. Fuzzy temporal logic 臘‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-17 Thread via GitHub


kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1569232314


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+// Complete async commit event
+final ArgumentCaptor commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+verify(applicationEventHandler).add(commitEventCaptor.capture());
+final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+commitEvent.future().complete(null);
+
+// Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), 
Duration.ofMillis(100)));
+}
+
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() 
{
+final TopicPartition tp = new TopicPartition("foo", 0);
+testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+// Complete async commit event and sync commit event
+final ArgumentCaptor commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+verify(applicationEventHandler).add(commitEventCaptor.capture());
+final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+commitEvent.future().complete(null);
+
+// Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+assertDoesNotThrow(() -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 
Duration.ofMillis(100)));
+}
+
+@Test
+public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() {
+final TopicPartition tp = new TopicPartition("foo", 0);
+testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+// Complete exceptionally async commit event and sync commit event
+final ArgumentCaptor commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+verify(applicationEventHandler).add(commitEventCaptor.capture());
+final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+commitEvent.future().completeExceptionally(new KafkaException("Test 
exception"));
+
+// Commit async is completed exceptionally, but this will be handled 
by commit callback - commit sync should not fail.
+assertDoesNotThrow(() -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 
Duration.ofMillis(100)));
+}
+
+private void 
testSyncCommitTimesoutAfterIncompleteAsyncCommit(TopicPartition tp) {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);

Review Comment:
   Yes, that's what I meant  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-17 Thread via GitHub


kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1569231774


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();

Review Comment:
    When I see duplicate (or nearly duplicated) code, my brain turns up its 
sensitivity because I assume there's some devil-in-the-details reason that the 
code wasn't reused.
   
   I'd assumed that it could be ever-so-slightly refactored into using 
`testSyncCommitTimesoutAfterIncompleteAsyncCommit()`, but no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-17 Thread via GitHub


kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1569226961


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -793,9 +795,9 @@ public void commitAsync(Map offsets, OffsetCo
 }
 
 private CompletableFuture commit(final CommitEvent commitEvent) {
-maybeThrowFencedInstanceException();
-maybeInvokeCommitCallbacks();
 maybeThrowInvalidGroupIdException();
+maybeThrowFencedInstanceException();
+offsetCommitCallbackInvoker.executeCallbacks();

Review Comment:
   Works for me. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-17 Thread via GitHub


lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1568770827


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1388,6 +1393,31 @@ public void commitSync(Map offsets, Duration
 }
 }
 
+private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {
+if (lastPendingAsyncCommit == null) {
+return;
+}
+
+try {
+final CompletableFuture futureToAwait = new 
CompletableFuture<>();
+// We don't want the wake-up trigger to complete our pending async 
commit future,
+// so create new future here. Any errors in the pending async 
commit will be handled
+// by the async commit future / the commit callback - here, we 
just want to wait for it to complete.
+lastPendingAsyncCommit.whenComplete((v, t) -> 
futureToAwait.complete(null));
+if (!disableWakeup) {
+wakeupTrigger.setActiveTask(futureToAwait);
+}
+ConsumerUtils.getResult(futureToAwait, timer);

Review Comment:
   Yes. I think if `lastPendingAsyncCommit` is completed before entering here, 
the `whenComplete` will execute immediately.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -158,7 +158,11 @@ public class AsyncKafkaConsumerTest {
 public void resetAll() {
 backgroundEventQueue.clear();
 if (consumer != null) {
-consumer.close(Duration.ZERO);
+try {
+consumer.close(Duration.ZERO);
+} catch (Exception e) {
+assertInstanceOf(KafkaException.class, e);
+}

Review Comment:
   `resetAll` isn't supposed to test anything, so this also shouldn't mask 
anything. It's purely for cleanup. In this case, it only affects two tests that 
will timeout on close (since we don't mock an async commit response). So let me 
do it. But in general, I wonder if adding clean-up logic to the tests itself 
won't reduce readability/clarity of the actual test case.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+// Complete async commit event
+final ArgumentCaptor commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+verify(applicationEventHandler).add(commitEventCaptor.capture());

Review Comment:
   Yes. I agree, it's a bit weird, but mockito is recording those invocations 
with copies (references) of all arguments. That's also why spying on lots of 
objects in busy event loops will accumulate lots of such "invocation objects" 
in memory.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();

Review Comment:
   I guess I may be less concerned with code duplication in test setup than my 
reviewers :). Done. Added another helper method that contains the lines up to 
the incomplete async commit.



##

Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-16 Thread via GitHub


kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1568001935


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1388,6 +1393,31 @@ public void commitSync(Map offsets, Duration
 }
 }
 
+private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {

Review Comment:
   nit: consider changing `disableWakeup` to `enableWakeup`. Double-negatives 
add nonzero cognitive overhead.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1388,6 +1393,31 @@ public void commitSync(Map offsets, Duration
 }
 }
 
+private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {
+if (lastPendingAsyncCommit == null) {
+return;
+}
+
+try {
+final CompletableFuture futureToAwait = new 
CompletableFuture<>();
+// We don't want the wake-up trigger to complete our pending async 
commit future,
+// so create new future here. Any errors in the pending async 
commit will be handled
+// by the async commit future / the commit callback - here, we 
just want to wait for it to complete.
+lastPendingAsyncCommit.whenComplete((v, t) -> 
futureToAwait.complete(null));
+if (!disableWakeup) {
+wakeupTrigger.setActiveTask(futureToAwait);
+}
+ConsumerUtils.getResult(futureToAwait, timer);

Review Comment:
   Is it true that the underlying `lastPendingAsyncCommit` `Future` could 
already be completed by this point, right?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();

Review Comment:
   Can this be replaced with a call to 
`testSyncCommitTimesoutAfterIncompleteAsyncCommit()` like the other tests? I 
glanced back and forth a couple of times and didn't see too much difference:
   
   ```suggestion
   final TopicPartition tp = new TopicPartition("foo", 0);
   testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+// Complete async commit event
+final ArgumentCaptor commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+verify(applicationEventHandler).add(commitEventCaptor.capture());

Review Comment:
   This use of JUnit is just about over my head...
   
   For my own understanding, at which line in this test does the 
`AsyncCommitEvent` get created and enqueued? I would assume at line 634, right?
   
   It looks like you're able to add the `ArgumentCaptor` _after_ the object 
pointed at by the argument was created. Is that correct? 樂 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1005,6 +1083,43 @@ public void testNoWakeupInCloseCommit() {
 assertFalse(capturedEvent.get().future().isCompletedExceptionally());
 }
 
+@Test
+public 

Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-15 Thread via GitHub


lucasbru commented on PR #15613:
URL: https://github.com/apache/kafka/pull/15613#issuecomment-2056077615

   @lianetm thanks for the comments, addressed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-15 Thread via GitHub


lucasbru commented on PR #15613:
URL: https://github.com/apache/kafka/pull/15613#issuecomment-2056076747

   @cadonna Could you please review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-11 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1561902955


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1388,6 +1393,33 @@ public void commitSync(Map offsets, Duration
 }
 }
 
+private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {
+if (lastPendingAsyncCommit == null) {
+return;
+}
+
+try {
+final CompletableFuture futureToAwait = new 
CompletableFuture<>();
+// We don't want the wake-up trigger to complete our pending async 
commit future,
+// so create new future here. Any errors in the pending async 
commit will be handled
+// by the async commit future / the commit callback - here, we 
just want to wait for it to complete.
+lastPendingAsyncCommit.whenComplete((v, t) -> {

Review Comment:
   nit: we could loose the braces and inline this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-11 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1561901442


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,90 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+// Complete async commit event
+final ArgumentCaptor commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+verify(applicationEventHandler).add(commitEventCaptor.capture());
+final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+commitEvent.future().complete(null);
+
+// Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), 
Duration.ofMillis(100)));
+}
+
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() 
{
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();
+
+// Mock to complete sync event
+completeCommitSyncApplicationEventSuccessfully();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 
Duration.ofMillis(100)));
+
+// Complete async commit event and sync commit event
+final ArgumentCaptor commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+verify(applicationEventHandler).add(commitEventCaptor.capture());
+final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+commitEvent.future().complete(null);
+
+// Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+assertDoesNotThrow(() -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 
Duration.ofMillis(100)));
+}
+
+@Test
+public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();
+
+// Mock to complete sync event
+completeCommitSyncApplicationEventSuccessfully();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 
Duration.ofMillis(100)));

Review Comment:
   This section seems to be doing exactly the same as in the initial section of 
the previous test? maybe clearer to encapsulate in a common 
`testSyncCommitTimesoutAfterIncompleteAsyncCommit`, and then each tests 
proceeds completing the commits its way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-11 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1561212534


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -984,6 +984,8 @@ public void close(final Timer timer) {
 }
 } finally {
 super.close(timer);
+// Super-class close may wait for more commit callbacks to 
complete.
+invokeCompletedOffsetCommitCallbacks();

Review Comment:
   You're right! (silly me, I was probably thinking about a duration arg 
instead of a timer), all good then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-11 Thread via GitHub


lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1560885552


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1952,10 +1988,6 @@ private void maybeThrowFencedInstanceException() {
 }
 }
 
-private void maybeInvokeCommitCallbacks() {
-offsetCommitCallbackInvoker.executeCallbacks();
-}
-

Review Comment:
   For me, abstracting this one-liner is more obfuscating than it is helping, 
but if you insist, I can bring it back.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1388,6 +1393,37 @@ public void commitSync(Map offsets, Duration
 }
 }
 
+private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {
+if (lastPendingAsyncCommit == null) {
+return;
+}
+
+try {
+CompletableFuture futureToAwait;
+if (!disableWakeup) {
+// We don't want the wake-up trigger to complete our pending 
async commit future,
+// so create new future here.
+futureToAwait = new CompletableFuture<>();
+lastPendingAsyncCommit.whenComplete((v, t) -> {
+if (t != null) {
+futureToAwait.completeExceptionally(t);
+} else {
+futureToAwait.complete(v);
+}
+});
+wakeupTrigger.setActiveTask(futureToAwait);
+} else {
+futureToAwait = lastPendingAsyncCommit;
+}
+ConsumerUtils.getResult(futureToAwait, timer);
+lastPendingAsyncCommit = null;
+} finally {
+if (!disableWakeup) wakeupTrigger.clearTask();
+timer.update();
+}

Review Comment:
   I think always clearing it in `finally` would mean that 
`lastPendingAsyncCommit` is cleared even though we timed out or were woken up 
while waiting for it.
   
   However, this brought up another issue - what happens when the async commit 
future completes exceptionally? We'd throw the exception here, but we shouldn't 
- the error will be handled inside the future. So basically here we want to 
wait for the async commit, not worrying about return value or exception. And 
then, the only cases why we fail here should be wake-up or time out, and in 
both cases, we should check again for the future to be completed the next time 
we trigger commit sync.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -157,8 +157,10 @@ public class AsyncKafkaConsumerTest {
 @AfterEach
 public void resetAll() {
 backgroundEventQueue.clear();
-if (consumer != null) {
+try {
 consumer.close(Duration.ZERO);
+} catch (Exception e) {
+// ignore

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-10 Thread via GitHub


kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1559907295


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -157,8 +157,10 @@ public class AsyncKafkaConsumerTest {
 @AfterEach
 public void resetAll() {
 backgroundEventQueue.clear();
-if (consumer != null) {
+try {
 consumer.close(Duration.ZERO);
+} catch (Exception e) {
+// ignore

Review Comment:
   I'm a little leery about swallowing the exception here. Can we validate the 
exception type is something we expect? e.g.:
   
   ```suggestion
   } catch (Exception e) {
   assertInstanceOf(KafkaException.class, e);
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1388,6 +1393,37 @@ public void commitSync(Map offsets, Duration
 }
 }
 
+private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {
+if (lastPendingAsyncCommit == null) {
+return;
+}
+
+try {
+CompletableFuture futureToAwait;
+if (!disableWakeup) {
+// We don't want the wake-up trigger to complete our pending 
async commit future,
+// so create new future here.
+futureToAwait = new CompletableFuture<>();
+lastPendingAsyncCommit.whenComplete((v, t) -> {
+if (t != null) {
+futureToAwait.completeExceptionally(t);
+} else {
+futureToAwait.complete(v);
+}
+});
+wakeupTrigger.setActiveTask(futureToAwait);
+} else {
+futureToAwait = lastPendingAsyncCommit;
+}
+ConsumerUtils.getResult(futureToAwait, timer);
+lastPendingAsyncCommit = null;
+} finally {
+if (!disableWakeup) wakeupTrigger.clearTask();
+timer.update();
+}

Review Comment:
   Do we want to clear out the `lastPendingAsyncCommit` in the `finally` block:
   
   ```suggestion
   ConsumerUtils.getResult(futureToAwait, timer);
   } finally {
   lastPendingAsyncCommit = null;
   if (!disableWakeup) wakeupTrigger.clearTask();
   timer.update();
   }
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1952,10 +1988,6 @@ private void maybeThrowFencedInstanceException() {
 }
 }
 
-private void maybeInvokeCommitCallbacks() {
-offsetCommitCallbackInvoker.executeCallbacks();
-}
-

Review Comment:
   Any reason we don't want to keep this method abstraction?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-10 Thread via GitHub


lucasbru commented on PR #15613:
URL: https://github.com/apache/kafka/pull/15613#issuecomment-2047945261

   Hey @lianetm. I split the PR into two, the changes for the legacy consumer 
go into https://github.com/apache/kafka/pull/15693. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org