Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]

2023-12-06 Thread via GitHub


philipnee commented on PR #14920:
URL: https://github.com/apache/kafka/pull/14920#issuecomment-1843868318

   Thanks @lucasbru and @kirktrue - i'm closing this one and reopening another 
one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]

2023-12-06 Thread via GitHub


philipnee closed pull request #14920: KAFKA-15696: Refactor AsyncConsumer close 
procedure
URL: https://github.com/apache/kafka/pull/14920


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14920:
URL: https://github.com/apache/kafka/pull/14920#discussion_r1417647914


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -957,6 +966,57 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+private void prepareShutdown(final Timer timer) {
+if (!groupMetadata.isPresent())
+return;
+
+maybeAutoCommitSync(timer);
+timer.update();
+if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+return;
+
+try {
+// If the consumer is in a group, we will pause and revoke all 
assigned partitions
+onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+timer.update();
+} catch (Exception e) {
+Exception exception = e;
+if (e instanceof ExecutionException)
+exception = (Exception) e.getCause();
+throw new KafkaException("User rebalance callback throws an 
error", exception);
+} finally {
+subscriptions.assignFromSubscribed(Collections.emptySet());
+}
+}
+
+private void maybeAutoCommitSync(final Timer timer) {
+if (autoCommitEnabled) {
+Map allConsumed = 
subscriptions.allConsumed();
+try {
+log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+commitSync(allConsumed, 
Duration.ofMillis(timer.remainingMs()));
+} catch (Exception e) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumed, e.getMessage());
+}
+}
+}
+
+private CompletableFuture onLeavePrepare() {

Review Comment:
   I need to speak to kirk about how he wants to implement the callback 
invocation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14920:
URL: https://github.com/apache/kafka/pull/14920#discussion_r1417647206


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -957,6 +966,57 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+private void prepareShutdown(final Timer timer) {
+if (!groupMetadata.isPresent())
+return;
+
+maybeAutoCommitSync(timer);
+timer.update();
+if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+return;
+
+try {
+// If the consumer is in a group, we will pause and revoke all 
assigned partitions
+onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+timer.update();
+} catch (Exception e) {
+Exception exception = e;
+if (e instanceof ExecutionException)
+exception = (Exception) e.getCause();
+throw new KafkaException("User rebalance callback throws an 
error", exception);
+} finally {
+subscriptions.assignFromSubscribed(Collections.emptySet());
+}
+}
+
+private void maybeAutoCommitSync(final Timer timer) {
+if (autoCommitEnabled) {
+Map allConsumed = 
subscriptions.allConsumed();
+try {
+log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+commitSync(allConsumed, 
Duration.ofMillis(timer.remainingMs()));
+} catch (Exception e) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumed, e.getMessage());
+}
+}
+}
+
+private CompletableFuture onLeavePrepare() {
+SortedSet droppedPartitions = new 
TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+if (!subscriptions.hasAutoAssignedPartitions() || 
droppedPartitions.isEmpty()) {
+return CompletableFuture.completedFuture(null);
+}
+// TODO: Invoke rebalanceListener via KAFKA-15276
+return CompletableFuture.completedFuture(null);
+}
+

Review Comment:
   I want to be consistent with Kirk's approach for unsubscribe() in terms of 
callback invocation - So I took the previous comment back (sorry about the 
confusion)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]

2023-12-06 Thread via GitHub


lucasbru commented on code in PR #14920:
URL: https://github.com/apache/kafka/pull/14920#discussion_r1417523313


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java:
##
@@ -331,34 +331,35 @@ public 
AsyncKafkaConsumerTestBuilder(Optional groupInfo, boole
 super(groupInfo, enableAutoCommit, enableAutoTick);
 String clientId = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
 List assignors = 
ConsumerPartitionAssignor.getAssignorInstances(
-
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
-
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId))
+
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
+
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId))
 );
 Deserializers deserializers = new 
Deserializers<>(new StringDeserializer(), new StringDeserializer());
 this.fetchCollector = spy(new FetchCollector<>(logContext,
-metadata,
-subscriptions,
-fetchConfig,
-deserializers,
-metricsManager,
-time));
+metadata,
+subscriptions,
+fetchConfig,
+deserializers,

Review Comment:
   It seems like you have slightly different formatting settings than somebody 
else. I wouldn't do these kinds of whitespace changes unless it's obviously 
unclean



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -957,6 +966,57 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+private void prepareShutdown(final Timer timer) {
+if (!groupMetadata.isPresent())
+return;
+
+maybeAutoCommitSync(timer);
+timer.update();
+if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+return;
+
+try {
+// If the consumer is in a group, we will pause and revoke all 
assigned partitions
+onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+timer.update();
+} catch (Exception e) {
+Exception exception = e;
+if (e instanceof ExecutionException)
+exception = (Exception) e.getCause();
+throw new KafkaException("User rebalance callback throws an 
error", exception);
+} finally {
+subscriptions.assignFromSubscribed(Collections.emptySet());
+}
+}
+
+private void maybeAutoCommitSync(final Timer timer) {
+if (autoCommitEnabled) {
+Map allConsumed = 
subscriptions.allConsumed();
+try {
+log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+commitSync(allConsumed, 
Duration.ofMillis(timer.remainingMs()));
+} catch (Exception e) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumed, e.getMessage());
+}
+}
+}
+
+private CompletableFuture onLeavePrepare() {

Review Comment:
   May miss a bit of context, but I'm not yet sure what this function is 
achieving. If this is for KAFKA-15276, maybe we can implement this as part of 
that ticket, because this function is mostly confusing me.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -436,10 +434,15 @@ public void testCommitAsyncLeaderEpochUpdate() {
 topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, 
Optional.of(1), ""));
 
 consumer.assign(Arrays.asList(t0, t1));
+consumer.seek(t0, 10);
+consumer.seek(t1, 20);
 
 CompletableFuture commitFuture = new CompletableFuture<>();
 commitFuture.complete(null);
 
+prepareCommit(Arrays.asList(t1, t0), DEFAULT_GROUP_ID, Errors.NONE);
+// TODO: The log shows NPE thrown from the CommitRequestManager, which 
is caused by the use of mock.

Review Comment:
   I think as long as the test isn't failing this is fine, since the problem 
should go away once we move to mocks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, 

Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]

2023-12-06 Thread via GitHub


lucasbru commented on code in PR #14920:
URL: https://github.com/apache/kafka/pull/14920#discussion_r1417484146


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -957,6 +966,57 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+private void prepareShutdown(final Timer timer) {
+if (!groupMetadata.isPresent())
+return;
+
+maybeAutoCommitSync(timer);
+timer.update();
+if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+return;
+
+try {
+// If the consumer is in a group, we will pause and revoke all 
assigned partitions
+onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+timer.update();
+} catch (Exception e) {
+Exception exception = e;
+if (e instanceof ExecutionException)
+exception = (Exception) e.getCause();
+throw new KafkaException("User rebalance callback throws an 
error", exception);
+} finally {
+subscriptions.assignFromSubscribed(Collections.emptySet());
+}
+}
+
+private void maybeAutoCommitSync(final Timer timer) {
+if (autoCommitEnabled) {
+Map allConsumed = 
subscriptions.allConsumed();
+try {
+log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+commitSync(allConsumed, 
Duration.ofMillis(timer.remainingMs()));
+} catch (Exception e) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumed, e.getMessage());
+}
+}
+}
+
+private CompletableFuture onLeavePrepare() {
+SortedSet droppedPartitions = new 
TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+if (!subscriptions.hasAutoAssignedPartitions() || 
droppedPartitions.isEmpty()) {
+return CompletableFuture.completedFuture(null);
+}
+// TODO: Invoke rebalanceListener via KAFKA-15276
+return CompletableFuture.completedFuture(null);
+}
+

Review Comment:
   Several people seem to agree that we should solve this as much as possible 
via an event. Is the new draft PR going to replace this PR, or should we try to 
merge this one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]

2023-12-06 Thread via GitHub


philipnee commented on PR #14920:
URL: https://github.com/apache/kafka/pull/14920#issuecomment-1842376289

   @kirktrue - Thanks for reviewing the code.  I did some refactor on a 
separated branch, I hope this is inline with what you meant there: 
https://github.com/apache/kafka/pull/14937 cc @lucasbru @cadonna : I wonder if 
the latter makes it easier to read


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]

2023-12-05 Thread via GitHub


philipnee commented on code in PR #14920:
URL: https://github.com/apache/kafka/pull/14920#discussion_r1416732684


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -957,6 +966,57 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+private void prepareShutdown(final Timer timer) {
+if (!groupMetadata.isPresent())
+return;
+
+maybeAutoCommitSync(timer);
+timer.update();
+if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+return;
+
+try {
+// If the consumer is in a group, we will pause and revoke all 
assigned partitions
+onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+timer.update();
+} catch (Exception e) {
+Exception exception = e;
+if (e instanceof ExecutionException)
+exception = (Exception) e.getCause();
+throw new KafkaException("User rebalance callback throws an 
error", exception);
+} finally {
+subscriptions.assignFromSubscribed(Collections.emptySet());
+}
+}
+
+private void maybeAutoCommitSync(final Timer timer) {
+if (autoCommitEnabled) {
+Map allConsumed = 
subscriptions.allConsumed();
+try {
+log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+commitSync(allConsumed, 
Duration.ofMillis(timer.remainingMs()));
+} catch (Exception e) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumed, e.getMessage());
+}
+}
+}
+
+private CompletableFuture onLeavePrepare() {
+SortedSet droppedPartitions = new 
TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+if (!subscriptions.hasAutoAssignedPartitions() || 
droppedPartitions.isEmpty()) {
+return CompletableFuture.completedFuture(null);
+}
+// TODO: Invoke rebalanceListener via KAFKA-15276
+return CompletableFuture.completedFuture(null);
+}
+

Review Comment:
   Thanks.  I mostly agree with your idea. Though - I think simply firing the 
callback revocation from the close() should be enough - but I think sending 
leave-group and closing events as you suggested is a good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]

2023-12-05 Thread via GitHub


kirktrue commented on code in PR #14920:
URL: https://github.com/apache/kafka/pull/14920#discussion_r1416404354


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -957,6 +966,57 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+private void prepareShutdown(final Timer timer) {
+if (!groupMetadata.isPresent())
+return;
+
+maybeAutoCommitSync(timer);
+timer.update();
+if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+return;
+
+try {
+// If the consumer is in a group, we will pause and revoke all 
assigned partitions
+onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+timer.update();
+} catch (Exception e) {
+Exception exception = e;
+if (e instanceof ExecutionException)
+exception = (Exception) e.getCause();
+throw new KafkaException("User rebalance callback throws an 
error", exception);
+} finally {
+subscriptions.assignFromSubscribed(Collections.emptySet());
+}
+}
+
+private void maybeAutoCommitSync(final Timer timer) {
+if (autoCommitEnabled) {
+Map allConsumed = 
subscriptions.allConsumed();
+try {
+log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+commitSync(allConsumed, 
Duration.ofMillis(timer.remainingMs()));
+} catch (Exception e) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumed, e.getMessage());
+}
+}
+}
+
+private CompletableFuture onLeavePrepare() {
+SortedSet droppedPartitions = new 
TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+if (!subscriptions.hasAutoAssignedPartitions() || 
droppedPartitions.isEmpty()) {
+return CompletableFuture.completedFuture(null);
+}
+// TODO: Invoke rebalanceListener via KAFKA-15276
+return CompletableFuture.completedFuture(null);
+}
+

Review Comment:
   I would imagine that the leave group process could be mostly performed using 
an event and a callback execution, like in #14931.
   
   We'd need to submit an event to the background thread (e.g. 
`PrepareCloseEvent`) so that the member manager can orchestrate the callback 
request and the heartbeat request.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -931,15 +941,14 @@ private void close(Duration timeout, boolean 
swallowException) {
 final Timer closeTimer = time.timer(timeout);
 clientTelemetryReporter.ifPresent(reporter -> 
reporter.initiateClose(timeout.toMillis()));
 closeTimer.update();
-
+// Prepare shutting down the network thread
+swallow(log, Level.ERROR, "Unexpected exception when preparing for 
shutdown", () -> prepareShutdown(closeTimer), firstException);
+closeTimer.update();
 if (applicationEventHandler != null)
-closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed to close application event handler with a timeout(ms)=" + 
closeTimer.remainingMs(), firstException);
-
-// Invoke all callbacks after the background thread exists in case if 
there are unsent async
-// commits
-maybeInvokeCommitCallbacks();
-
-closeQuietly(fetchBuffer, "Failed to close the fetch buffer", 
firstException);
+closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed shutting down network thread", firstException);
+closeTimer.update();
+// Ensure all async commit callbacks are invoked
+swallow(log, Level.ERROR, "Failed invoking asynchronous commit 
callback", this::maybeInvokeCommitCallbacks, firstException);

Review Comment:
   These should be done before we shut down the network, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org