Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on PR #15613: URL: https://github.com/apache/kafka/pull/15613#issuecomment-2040495611 Thanks for the changes @lucasbru, looks good to me overall. This is tidying up the whole async commit callbacks execution story. Left some comments, mostly minor, and to make sure we're on the same page with the reasoning behind the change. Should we update the PR description to refer not only to the `consumer.commitSync()`, but also `consumer.close()`, both being fixed here to ensure that previous async commit callbacks are always executed? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. Review Comment: I would extend with something like : ...before the consumer is closed, **even when no commit sync is performed as part of the close (due to auto-commit disabled, or simply because there no consumed offsets).** That's the key as I see it, fixed in this PR, and being tested here. If the call to consumer.close performs an actual commit sync (needs auto-commit enabled and non-empty consumed offsets), then the async callbacks were always called I expect. The contract was not being respected in case the commit sync did not happen for some of the reasons mentioned above. Agree? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554177925 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount); + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(1, cb.successCount); + +// Enforce looking up the coordinator +consumer.committed(Set(tp, tp2).asJava) Review Comment: I would say we don't need this, because of the successful `assertEquals` with call to `committed` above, ln 694. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554165141 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { Review Comment: nit: maybe better name testCommitAsyncCompleted**Before**ConsumerCloses (clearer and consistent with the similar one below) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. Review Comment: I would extend with something like : ...before the consumer is closed, **even when no commit sync is performed as part of the close (due to auto-commit disabled, or simply because there no consumed offsets).** That's the key as I see it, fixed in this PR, and being tested here. If the call to consumer.close performs an actual commit sync (needs auto-commit enabled and non-empty consumed offsets), then the async callbacks were always called I expect. The contract was not being respected in case the commit sync did not happen for some of the reasons mentioned above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. Review Comment: I would extend with something like : ...before the consumer is closed, **even when no commit sync is performed as part of the close (due to auto-commit disabled, or simply because there no consumed offsets).** That's the key as I see it, fixed in this PR, and being tested here. If the call to consumer.close performs a commit sync (needs auto-commit enabled and non-empty consumed offsets), then the async callbacks were always called I expect. The contract was not being respected in case the commit sync did not happen for some of the reasons mentioned above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554025678 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount); + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(1, cb.successCount); + +// Enforce looking up the coordinator +consumer.committed(Set(tp, tp2).asJava) + +// Try with coordinator known +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava, cb) +consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(2L))).asJava) +assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset) +assertEquals(2, cb.successCount); + +// Try with empty sync commit +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset) +assertEquals(3, cb.successCount); Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554024796 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount); + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(1, cb.successCount); + +// Enforce looking up the coordinator +consumer.committed(Set(tp, tp2).asJava) + +// Try with coordinator known +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava, cb) +consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(2L))).asJava) +assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset) +assertEquals(2, cb.successCount); Review Comment: nit: semi-colon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554024079 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount); Review Comment: nit: unneeded semi-colon. Java to scala jump tricking us...been there :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554020736 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -229,7 +229,11 @@ private GroupRebalanceConfig buildRebalanceConfig(Optional groupInstance @AfterEach public void teardown() { this.metrics.close(); -this.coordinator.close(time.timer(0)); +try { +this.coordinator.close(time.timer(0)); Review Comment: I see, I would say it's fine to throw the error at the coordinator level (and live with code like this). And actually, the need for this catch is not introduced by this PR as I see it. The coordinator close before this PR could throw fenced exception for async commits that were waiting for coord and completed [here](https://github.com/apache/kafka/blob/fd9c7d2932dee055289b403e37a0bbb631c080a9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L983) getting fenced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554011665 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { Review Comment: We have a new `PlainTextConsumerCommitTest` for all commit-relates tests. These 2 should go there I would say. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -984,6 +984,8 @@ public void close(final Timer timer) { } } finally { super.close(timer); +// Super-class close may wait for more commit callbacks to complete. +invokeCompletedOffsetCommitCallbacks(); Review Comment: Agree, there could be async requests, with known coord, not getting a response within the above commit sync time, then getting it while the super.close waits, so we should trigger the callbacks at this point. But this makes me notice, aren't we breaking the `close(Duration)` contract here, calling that `super.close(timer)` on the finally clause? Let's say async requests that are not getting a response within the timeout in the above while loop (so we block for time on the while), then `finally`, the super class blocks for that time again [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139). Am I missing something? (I can file a separate Jira if I'm not missing something here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) { } private boolean invokePendingAsyncCommits(Timer timer) { -if (inFlightAsyncCommits.get() == 0) { +if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { Review Comment: This makes sense to me, to fill a gap in the case of commit sync with empty offsets, that skips the path of sending an actual request, and that's why it looses the guarantee of executing the callbacks as I see it. This makes the logic consistent with what happens if the commit sync has non-empty offsets. In that case, it does execute the callbacks for previous async commits that were waiting for coord: the sync commit would be blocked on the same findCoord request (there's always just 1), and the moment the coord is found the async is marked as inflight [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036), so it would be considered for callbacks [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121). Am I getting the reasoning for the change right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) { } private boolean invokePendingAsyncCommits(Timer timer) { -if (inFlightAsyncCommits.get() == 0) { +if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { Review Comment: This makes sense to me, to fill a gap in the case of commit sync with empty offsets, that skips the path of sending an actual request, and that's why it looses the guarantee of executing the callbacks as I see it, right? This makes the logic consistent with what happens if the commit sync has non-empty offsets. In that case, it does execute the callbacks for previous async commits that were waiting for coord: the sync commit would be blocked on the same findCoord request (there's always just 1), and the moment the coord is found the async is marked as inflight [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036), so it would be considered for callbacks [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) { } private boolean invokePendingAsyncCommits(Timer timer) { -if (inFlightAsyncCommits.get() == 0) { +if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { Review Comment: This makes sense to me, to fill a gap in the case of commit sync with empty offsets, that skips the path of sending an actual request, and that's why it looses the guarantee of executing the callbacks as I see it. This makes the logic consistent with what happens if the commit sync has non-empty offsets. In that case, it does execute the callbacks for previous async commits that were waiting for coord: the sync commit would be blocked on the same findCoord request (there's always just 1), and the moment the coord is found the async is marked as inflight [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036), so it would be considered for callbacks [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1553899936 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1360,6 +1362,9 @@ public void commitSync(Map offsets, Duration Timer requestTimer = time.timer(timeout.toMillis()); SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, requestTimer); CompletableFuture commitFuture = commit(syncCommitEvent); + +awaitPendingAsyncCommits(requestTimer, false); Review Comment: nit: maybe helpful to reflect in the name that this does execute the callbacks (or leave it as it is and then have the one line 1406 that does execute the callbacks here, right after) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1553899936 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1360,6 +1362,9 @@ public void commitSync(Map offsets, Duration Timer requestTimer = time.timer(timeout.toMillis()); SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, requestTimer); CompletableFuture commitFuture = commit(syncCommitEvent); + +awaitPendingAsyncCommits(requestTimer, false); Review Comment: nit: maybe helpful to reflect in the name that this does execute the callbacks (or leave it as it is and then have the one line that does execute the callbacks here, right after) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -984,6 +984,8 @@ public void close(final Timer timer) { } } finally { super.close(timer); +// Super-class close may wait for more commit callbacks to complete. +invokeCompletedOffsetCommitCallbacks(); Review Comment: Agree, there could be async requests, with known coord, not getting a response in the above while loop, then getting it while the super.close waits, so we should trigger the callbacks at this point. But this makes me notice, aren't we breaking the `close(Duration)` contract here, calling that `super.close(timer)` on the finally clause? Let's say async requests that are not getting a response within the timeout in the above while loop (so we block for time on the while), then `finally`, the super class blocks for that time again [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139). Am I missing something? (I can file a separate Jira if I'm not missing something here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -984,6 +984,8 @@ public void close(final Timer timer) { } } finally { super.close(timer); +// Super-class close may wait for more commit callbacks to complete. +invokeCompletedOffsetCommitCallbacks(); Review Comment: Agree, there could be async requests, with known coord, not getting a response in the above while loop, then getting it while the super.close waits, so we should trigger the callbacks at this point. But this makes me notice, aren't we breaking the `close(Duration)` contract here, calling that `super.close(timer)` on the finally clause? Let's say async requests that are not getting a response within the timeout in the above while loop (so we block for time on the while), then `finally`, the super class blocks for that time again [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139) . Am I missing something? (I can file a separate Jira if I'm not missing something here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -984,6 +984,8 @@ public void close(final Timer timer) { } } finally { super.close(timer); +// Super-class close may wait for more commit callbacks to complete. +invokeCompletedOffsetCommitCallbacks(); Review Comment: Agree, there could be async requests, with known coord, not getting a response in the above while loop, then getting it while the super.close waits, so we trigger the callbacks. But this makes me notice, aren't we breaking the `close(Duration)` contract here, calling that `super.close(timer)` on the finally clause? Let's say async requests that are not getting a response within the timeout in the above while loop (so we block for time on the while), then `finally`, the super class blocks for that time again [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139) . Am I missing something? (I can file a separate Jira if I'm not missing something here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -984,6 +984,8 @@ public void close(final Timer timer) { } } finally { super.close(timer); +// Super-class close may wait for more commit callbacks to complete. +invokeCompletedOffsetCommitCallbacks(); Review Comment: Agree, there could be async requests, with known coord, not getting a response in the above while loop, then getting it while the super.close waits, so we trigger the callbacks. But this makes me notice, aren't we breaking the close(Duration) contract here, calling that super.close(timer) on the finally clause? Let's say async requests that are not getting a response within the timeout in the above while loop (so we block for time on the while), then `finally`, the super class blocks for that time again [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139) . Am I missing something? (I can file a separate Jira if I'm not missing something here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) { } private boolean invokePendingAsyncCommits(Timer timer) { -if (inFlightAsyncCommits.get() == 0) { +if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { Review Comment: This makes sense to me, to fill a gap in the case of commit sync with empty offsets, that skips the path of sending an actual request, and that's why it looses the guarantee of executing the callbacks as I see it. This makes the logic consistent with what happens if the commit sync has non-empty offsets. In that case, it does execute the callbacks for previous async commits that were waiting for coord: the sync commit would be blocked on the same findCoord request (there's always just 1), and the moment the coord is found the async is marked as inflight [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036), so it will be considered for callbacks [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543068952 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1360,6 +1367,9 @@ public void commitSync(Map offsets, Duration Timer requestTimer = time.timer(timeout.toMillis()); SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, requestTimer); CompletableFuture commitFuture = commit(syncCommitEvent); + +invokePendingAsyncCommits(requestTimer, false); Review Comment: In principle, this is only needed in the case where offsets are empty, because otherwise the commitSync itself should make sure that all previous asynccommits are completed, since the commit requests are ordered and going to the same connection. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543066981 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) { } private boolean invokePendingAsyncCommits(Timer timer) { -if (inFlightAsyncCommits.get() == 0) { +if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { Review Comment: We need to check also if no asyncCommits are pending (not sent because coordinator not known yet). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543063391 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -229,7 +229,11 @@ private GroupRebalanceConfig buildRebalanceConfig(Optional groupInstance @AfterEach public void teardown() { this.metrics.close(); -this.coordinator.close(time.timer(0)); +try { +this.coordinator.close(time.timer(0)); Review Comment: Close will throw now if our `asyncCommit` gets fenced, because we actually attempt to run the callback. If we don't want that, I don't think we can implement the guarantee that the callback is run in close. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org