Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on PR #15613:
URL: https://github.com/apache/kafka/pull/15613#issuecomment-2040495611

   Thanks for the changes @lucasbru, looks good to me overall. This is tidying 
up the whole async commit callbacks execution story. Left some comments, mostly 
minor, and to make sure we're on the same page with the reasoning behind the 
change.
   
   Should we update the PR description to refer not only to the 
`consumer.commitSync()`, but also `consumer.close()`, both being fixed here to 
ensure that previous async commit callbacks are always executed?
   
   Thanks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.

Review Comment:
   I would extend with something like : ...before the consumer is closed, 
**even when no commit sync is performed as part of the close (due to 
auto-commit disabled, or simply because there no consumed offsets).**
   
   That's the key as I see it, fixed in this PR, and being tested here. If the 
call to consumer.close performs an actual commit sync (needs auto-commit 
enabled and non-empty consumed offsets), then the async callbacks were always 
called I expect. The contract was not being respected in case the commit sync 
did not happen for some of the reasons mentioned above. Agree?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554177925


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount);
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(1, cb.successCount);
+
+// Enforce looking up the coordinator
+consumer.committed(Set(tp, tp2).asJava)

Review Comment:
   I would say we don't need this, because of the successful `assertEquals` 
with call to `committed` above, ln 694. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554165141


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   nit: maybe better name testCommitAsyncCompleted**Before**ConsumerCloses 
(clearer and consistent with the similar one below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.

Review Comment:
   I would extend with something like : ...before the consumer is closed, 
**even when no commit sync is performed as part of the close (due to 
auto-commit disabled, or simply because there no consumed offsets).**
   
   That's the key as I see it, fixed in this PR, and being tested here. If the 
call to consumer.close performs an actual commit sync (needs auto-commit 
enabled and non-empty consumed offsets), then the async callbacks were always 
called I expect. The contract was not being respected in case the commit sync 
did not happen for some of the reasons mentioned above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.

Review Comment:
   I would extend with something like : ...before the consumer is closed, 
**even when no commit sync is performed as part of the close (due to 
auto-commit disabled, or simply because there no consumed offsets).**
   
   That's the key as I see it, fixed in this PR, and being tested here. If the 
call to consumer.close performs a commit sync (needs auto-commit enabled and 
non-empty consumed offsets), then the async callbacks were always called I 
expect. The contract was not being respected in case the commit sync did not 
happen for some of the reasons mentioned above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554025678


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount);
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(1, cb.successCount);
+
+// Enforce looking up the coordinator
+consumer.committed(Set(tp, tp2).asJava)
+
+// Try with coordinator known
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(2L))).asJava, cb)
+consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(2L))).asJava)
+assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+assertEquals(2, cb.successCount);
+
+// Try with empty sync commit
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(3L))).asJava, cb)
+consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+assertEquals(3, cb.successCount);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554024796


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount);
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(1, cb.successCount);
+
+// Enforce looking up the coordinator
+consumer.committed(Set(tp, tp2).asJava)
+
+// Try with coordinator known
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(2L))).asJava, cb)
+consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(2L))).asJava)
+assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+assertEquals(2, cb.successCount);

Review Comment:
   nit: semi-colon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554024079


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount);

Review Comment:
   nit: unneeded semi-colon. Java to scala jump tricking us...been there :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554020736


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -229,7 +229,11 @@ private GroupRebalanceConfig 
buildRebalanceConfig(Optional groupInstance
 @AfterEach
 public void teardown() {
 this.metrics.close();
-this.coordinator.close(time.timer(0));
+try {
+this.coordinator.close(time.timer(0));

Review Comment:
   I see, I would say it's fine to throw the error at the coordinator level 
(and live with code like this). 
   
   And actually, the need for this catch is not introduced by this PR as I see 
it. The coordinator close before this PR could throw fenced exception for async 
commits that were waiting for coord and completed 
[here](https://github.com/apache/kafka/blob/fd9c7d2932dee055289b403e37a0bbb631c080a9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L983)
 getting fenced.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554011665


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   We have a new `PlainTextConsumerCommitTest` for all commit-relates tests. 
These 2 should go there I would say. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -984,6 +984,8 @@ public void close(final Timer timer) {
 }
 } finally {
 super.close(timer);
+// Super-class close may wait for more commit callbacks to 
complete.
+invokeCompletedOffsetCommitCallbacks();

Review Comment:
   Agree, there could be async requests, with known coord, not getting a 
response within the above commit sync time, then getting it while the 
super.close waits, so we should trigger the callbacks at this point.  
   
   But this makes me notice, aren't we breaking the `close(Duration)` contract 
here, calling that `super.close(timer)` on the finally clause? Let's say async 
requests that are not getting a response within the timeout in the above while 
loop (so we block for time on the while), then `finally`, the super class 
blocks for that time again 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139).
 Am I missing something? (I can file a separate Jira if I'm not missing 
something here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) {
 }
 
 private boolean invokePendingAsyncCommits(Timer timer) {
-if (inFlightAsyncCommits.get() == 0) {
+if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) 
{

Review Comment:
   This makes sense to me, to fill a gap in the case of commit sync with empty 
offsets, that skips the path of sending an actual request, and that's why it 
looses the guarantee of executing the callbacks as I see it.
   
   This makes the logic consistent with what happens if the commit sync has 
non-empty offsets. In that case, it does execute the callbacks for previous 
async commits that were waiting for coord:  the sync commit would be blocked on 
the same findCoord request (there's always just 1), and the moment the coord is 
found the async is marked as inflight 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036),
 so it would be considered for callbacks 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121).
 
   
   Am I getting the reasoning for the change right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) {
 }
 
 private boolean invokePendingAsyncCommits(Timer timer) {
-if (inFlightAsyncCommits.get() == 0) {
+if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) 
{

Review Comment:
   This makes sense to me, to fill a gap in the case of commit sync with empty 
offsets, that skips the path of sending an actual request, and that's why it 
looses the guarantee of executing the callbacks as I see it, right? 
   
   This makes the logic consistent with what happens if the commit sync has 
non-empty offsets. In that case, it does execute the callbacks for previous 
async commits that were waiting for coord:  the sync commit would be blocked on 
the same findCoord request (there's always just 1), and the moment the coord is 
found the async is marked as inflight 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036),
 so it would be considered for callbacks 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121).
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) {
 }
 
 private boolean invokePendingAsyncCommits(Timer timer) {
-if (inFlightAsyncCommits.get() == 0) {
+if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) 
{

Review Comment:
   This makes sense to me, to fill a gap in the case of commit sync with empty 
offsets, that skips the path of sending an actual request, and that's why it 
looses the guarantee of executing the callbacks as I see it. 
   
   This makes the logic consistent with what happens if the commit sync has 
non-empty offsets. In that case, it does execute the callbacks for previous 
async commits that were waiting for coord:  the sync commit would be blocked on 
the same findCoord request (there's always just 1), and the moment the coord is 
found the async is marked as inflight 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036),
 so it would be considered for callbacks 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121).
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1553899936


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1360,6 +1362,9 @@ public void commitSync(Map offsets, Duration
 Timer requestTimer = time.timer(timeout.toMillis());
 SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, 
requestTimer);
 CompletableFuture commitFuture = commit(syncCommitEvent);
+
+awaitPendingAsyncCommits(requestTimer, false);

Review Comment:
   nit: maybe helpful to reflect in the name that this does execute the 
callbacks (or leave it as it is and then have the one line 1406 that does 
execute the callbacks here, right after) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1553899936


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1360,6 +1362,9 @@ public void commitSync(Map offsets, Duration
 Timer requestTimer = time.timer(timeout.toMillis());
 SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, 
requestTimer);
 CompletableFuture commitFuture = commit(syncCommitEvent);
+
+awaitPendingAsyncCommits(requestTimer, false);

Review Comment:
   nit: maybe helpful to reflect in the name that this does execute the 
callbacks (or leave it as it is and then have the one line that does execute 
the callbacks here, right after) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-28 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -984,6 +984,8 @@ public void close(final Timer timer) {
 }
 } finally {
 super.close(timer);
+// Super-class close may wait for more commit callbacks to 
complete.
+invokeCompletedOffsetCommitCallbacks();

Review Comment:
   Agree, there could be async requests, with known coord, not getting a 
response in the above while loop, then getting it while the super.close waits, 
so we should trigger the callbacks at this point.  
   
   But this makes me notice, aren't we breaking the `close(Duration)` contract 
here, calling that `super.close(timer)` on the finally clause? Let's say async 
requests that are not getting a response within the timeout in the above while 
loop (so we block for time on the while), then `finally`, the super class 
blocks for that time again 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139).
 Am I missing something? (I can file a separate Jira if I'm not missing 
something here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-28 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -984,6 +984,8 @@ public void close(final Timer timer) {
 }
 } finally {
 super.close(timer);
+// Super-class close may wait for more commit callbacks to 
complete.
+invokeCompletedOffsetCommitCallbacks();

Review Comment:
   Agree, there could be async requests, with known coord, not getting a 
response in the above while loop, then getting it while the super.close waits, 
so we should trigger the callbacks at this point.  
   
   But this makes me notice, aren't we breaking the `close(Duration)` contract 
here, calling that `super.close(timer)` on the finally clause? Let's say async 
requests that are not getting a response within the timeout in the above while 
loop (so we block for time on the while), then `finally`, the super class 
blocks for that time again 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139)
 . Am I missing something? (I can file a separate Jira if I'm not missing 
something here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-28 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -984,6 +984,8 @@ public void close(final Timer timer) {
 }
 } finally {
 super.close(timer);
+// Super-class close may wait for more commit callbacks to 
complete.
+invokeCompletedOffsetCommitCallbacks();

Review Comment:
   Agree, there could be async requests, with known coord, not getting a 
response in the above while loop, then getting it while the super.close waits, 
so we trigger the callbacks.  
   
   But this makes me notice, aren't we breaking the `close(Duration)` contract 
here, calling that `super.close(timer)` on the finally clause? Let's say async 
requests that are not getting a response within the timeout in the above while 
loop (so we block for time on the while), then `finally`, the super class 
blocks for that time again 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139)
 . Am I missing something? (I can file a separate Jira if I'm not missing 
something here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-28 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -984,6 +984,8 @@ public void close(final Timer timer) {
 }
 } finally {
 super.close(timer);
+// Super-class close may wait for more commit callbacks to 
complete.
+invokeCompletedOffsetCommitCallbacks();

Review Comment:
   Agree, there could be async requests, with known coord, not getting a 
response in the above while loop, then getting it while the super.close waits, 
so we trigger the callbacks.  But this makes me notice, aren't we breaking the 
close(Duration) contract here, calling that super.close(timer) on the finally 
clause? Let's say async requests that are not getting a response within the 
timeout in the above while loop (so we block for time on the while), then 
`finally`, the super class blocks for that time again 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139)
 . Am I missing something? (I can file a separate Jira if I'm not missing 
something here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-28 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) {
 }
 
 private boolean invokePendingAsyncCommits(Timer timer) {
-if (inFlightAsyncCommits.get() == 0) {
+if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) 
{

Review Comment:
   This makes sense to me, to fill a gap in the case of commit sync with empty 
offsets, that skips the path of sending an actual request, and that's why it 
looses the guarantee of executing the callbacks as I see it. 
   
   This makes the logic consistent with what happens if the commit sync has 
non-empty offsets. In that case, it does execute the callbacks for previous 
async commits that were waiting for coord:  the sync commit would be blocked on 
the same findCoord request (there's always just 1), and the moment the coord is 
found the async is marked as inflight 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036),
 so it will be considered for callbacks 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121).
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-28 Thread via GitHub


lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543068952


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1360,6 +1367,9 @@ public void commitSync(Map offsets, Duration
 Timer requestTimer = time.timer(timeout.toMillis());
 SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, 
requestTimer);
 CompletableFuture commitFuture = commit(syncCommitEvent);
+
+invokePendingAsyncCommits(requestTimer, false);

Review Comment:
   In principle, this is only needed in the case where offsets are empty, 
because otherwise the commitSync itself should make sure that all previous 
asynccommits are completed, since the commit requests are ordered and going to 
the same connection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-28 Thread via GitHub


lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543066981


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) {
 }
 
 private boolean invokePendingAsyncCommits(Timer timer) {
-if (inFlightAsyncCommits.get() == 0) {
+if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) 
{

Review Comment:
   We need to check also if no asyncCommits are pending (not sent because 
coordinator not known yet).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-28 Thread via GitHub


lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543063391


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -229,7 +229,11 @@ private GroupRebalanceConfig 
buildRebalanceConfig(Optional groupInstance
 @AfterEach
 public void teardown() {
 this.metrics.close();
-this.coordinator.close(time.timer(0));
+try {
+this.coordinator.close(time.timer(0));

Review Comment:
   Close will throw now if our `asyncCommit` gets fenced, because we actually 
attempt to run the callback. If we don't want that, I don't think we can 
implement the guarantee that the callback is run in close.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org