lianetm commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1836778721
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -496,8 +496,9 @@ private void clearAssignment() {
*/
private void
updateSubscriptionAwaitingCallback(SortedSet<TopicIdPartition>
assignedPartitions,
SortedSet<TopicPartition>
addedPartitions) {
- Collection<TopicPartition> assignedTopicPartitions =
toTopicPartitionSet(assignedPartitions);
+ Set<TopicPartition> assignedTopicPartitions =
toTopicPartitionSet(assignedPartitions);
subscriptions.assignFromSubscribedAwaitingCallback(assignedTopicPartitions,
addedPartitions);
+ notifyAssignmentChange(assignedTopicPartitions);
Review Comment:
we are notifying on the path of new assignment and on leave, but I think
we're missing the fenced/fatal paths.
Those 2 end up calling `clearAssignment`, so maybe we could notify there to
cover them? (right after ln 483
subscriptions.assignFromSubscribed(Collections.emptySet()))?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -523,18 +524,45 @@ public void transitionToJoining() {
/**
* Transition to {@link MemberState#PREPARE_LEAVING} to release the
assignment. Once completed,
* transition to {@link MemberState#LEAVING} to send the heartbeat request
and leave the group.
- * This is expected to be invoked when the user calls the unsubscribe API.
+ * This is expected to be invoked when the user calls the {@link
Consumer#close()} API.
+ *
+ * @return Future that will complete when the heartbeat to leave the group
has been sent out.
+ */
+ public CompletableFuture<Void> leaveGroupOnClose() {
+ return leaveGroup(false);
+ }
+
+ /**
+ * Transition to {@link MemberState#PREPARE_LEAVING} to release the
assignment. Once completed,
+ * transition to {@link MemberState#LEAVING} to send the heartbeat request
and leave the group.
+ * This is expected to be invoked when the user calls the {@link
Consumer#unsubscribe()} API.
*
* @return Future that will complete when the callback execution completes
and the heartbeat
* to leave the group has been sent out.
*/
public CompletableFuture<Void> leaveGroup() {
+ return leaveGroup(true);
+ }
+
+ /**
+ * Transition to {@link MemberState#PREPARE_LEAVING} to release the
assignment. Once completed,
+ * transition to {@link MemberState#LEAVING} to send the heartbeat request
and leave the group.
+ * This is expected to be invoked when the user calls the unsubscribe API.
Review Comment:
```suggestion
* This is expected to be invoked when the user calls the unsubscribe
API or is closing the consumer.
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1216,6 +1235,81 @@ public void close(Duration timeout) {
}
}
+ /**
+ * Please keep these tenets in mind for the implementation of the {@link
AsyncKafkaConsumer}’s
+ * {@link #close(Duration)} method. In the future, these tenets may be
made officially part of the top-level
+ * {@link KafkaConsumer#close(Duration)} API, but for now they remain here.
+ *
+ * <ol>
+ * <li>
+ * The execution of the {@link ConsumerRebalanceListener} callback
(if applicable) must be performed on
+ * the application thread to ensure it does not interfere with the
network I/O on the background thread.
+ * </li>
+ * <li>
+ * The {@link ConsumerRebalanceListener} callback execution must
complete before an attempt to leave
+ * the consumer group is performed. In this context, “complete”
does not necessarily imply
+ * <em>success</em>; execution is “complete” even if the execution
<em>fails</em> with an error.
+ * </li>
+ * <li>
+ * Any error thrown during the {@link ConsumerRebalanceListener}
callback execution will be caught to
+ * ensure it does not prevent execution of the remaining {@link
#close()} logic.
+ * </li>
+ * <li>
+ * The application thread will be blocked during the entire
duration of the execution of the
+ * {@link ConsumerRebalanceListener}. The {@link #close()} method
does not employ a mechanism to
+ * short-circuit the callback execution.
+ * </li>
+ * <li>
+ * Since the {@link ConsumerRebalanceListener} APIs do not include
a timeout parameter, a given
+ * {@link ConsumerRebalanceListener} implementation cannot alter
its behavior to adhere to the timeout.
+ * </li>
+ * <li>
+ * A given {@link ConsumerRebalanceListener} implementation may be
affected by the application thread's
+ * interrupt state. If the callback implementation performs any
blocking operations, it may result in
+ * an error. An implementation may choose to preemptively check
the thread's interrupt flag via
+ * {@link Thread#isInterrupted()} or {@link
Thread#isInterrupted()} and alter its behavior.
+ * </li>
+ * <li>
+ * If the application thread was interrupted <em>prior</em> to the
execution of the
+ * {@link ConsumerRebalanceListener} callback, the thread's
interrupt state will be preserved for the
+ * {@link ConsumerRebalanceListener} execution.
+ * </li>
+ * <li>
+ * If the application thread was interrupted <em>prior</em> to the
execution of the
+ * {@link ConsumerRebalanceListener} callback <em>but</em> the
callback cleared out the interrupt state,
+ * the {@link #close()} method will not make any effort to restore
the application thread's interrupt
+ * state for the remainder of the execution of {@link #close()}.
+ * </li>
+ * <li>
+ * The consumer will attempt to leave the group on a “best-case”
basis. There is no stated guarantee
+ * that the consumer will have successfully left the group before
the {@link #close()} method
+ * completes processing.
+ * </li>
+ * <li>
+ * Leaving the consumer group is achieved by issuing a ‘leave
group‘ network request. This network I/O
+ * must be performed on the background thread.
+ * </li>
+ * <li>
+ * The consumer will attempt to leave the group regardless of the
timeout elapsing or the application
+ * thread receiving an {@link InterruptException} or {@link
InterruptedException}.
+ * </li>
+ * <li>
+ * The application thread will wait for confirmation that the
consumer left the group until one of the
+ * following occurs:
+ *
+ * <ol>
+ * <li>Confirmation that the ’leave group‘ response was
received from the group coordinator</li>
+ * <li>The timeout provided by the user elapses</li>
+ * <li>An {@link InterruptException} or {@link
InterruptedException} is thrown</li>
+ * </ol>
+ * </li>
+ * <li>
+ * The {@link #close(Duration)} method should otherwise honor the
timeout and interrupt flag,
+ * except where it would violate the previous tenets.
Review Comment:
I wonder if we're making it sound more complicated than it is, by saying
"except where it would violate the previous tenets" and given that there are 12
tenets above :). Close always honours timeout except for callbacks only right?
(already mentioned in point 4). If we simplify as mentioned above, adding to 4
the fact that callbacks are not time-bounded, I would say we don't need to have
this one. Makes sense?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1216,6 +1235,81 @@ public void close(Duration timeout) {
}
}
+ /**
+ * Please keep these tenets in mind for the implementation of the {@link
AsyncKafkaConsumer}’s
+ * {@link #close(Duration)} method. In the future, these tenets may be
made officially part of the top-level
+ * {@link KafkaConsumer#close(Duration)} API, but for now they remain here.
+ *
+ * <ol>
+ * <li>
+ * The execution of the {@link ConsumerRebalanceListener} callback
(if applicable) must be performed on
+ * the application thread to ensure it does not interfere with the
network I/O on the background thread.
+ * </li>
+ * <li>
+ * The {@link ConsumerRebalanceListener} callback execution must
complete before an attempt to leave
+ * the consumer group is performed. In this context, “complete”
does not necessarily imply
+ * <em>success</em>; execution is “complete” even if the execution
<em>fails</em> with an error.
+ * </li>
+ * <li>
+ * Any error thrown during the {@link ConsumerRebalanceListener}
callback execution will be caught to
+ * ensure it does not prevent execution of the remaining {@link
#close()} logic.
+ * </li>
+ * <li>
+ * The application thread will be blocked during the entire
duration of the execution of the
+ * {@link ConsumerRebalanceListener}. The {@link #close()} method
does not employ a mechanism to
+ * short-circuit the callback execution.
+ * </li>
+ * <li>
+ * Since the {@link ConsumerRebalanceListener} APIs do not include
a timeout parameter, a given
+ * {@link ConsumerRebalanceListener} implementation cannot alter
its behavior to adhere to the timeout.
Review Comment:
I find this is closely related to the one above (said in a way there
already). Callbacks block for the full execution, not time-bounded, that's it.
Could we simplify by merging them? (just adding the "not time bounded" bit
above). Trying to simplify, this doc got a bit too complicated I would say.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1216,6 +1235,81 @@ public void close(Duration timeout) {
}
}
+ /**
+ * Please keep these tenets in mind for the implementation of the {@link
AsyncKafkaConsumer}’s
+ * {@link #close(Duration)} method. In the future, these tenets may be
made officially part of the top-level
+ * {@link KafkaConsumer#close(Duration)} API, but for now they remain here.
+ *
+ * <ol>
+ * <li>
+ * The execution of the {@link ConsumerRebalanceListener} callback
(if applicable) must be performed on
+ * the application thread to ensure it does not interfere with the
network I/O on the background thread.
+ * </li>
+ * <li>
+ * The {@link ConsumerRebalanceListener} callback execution must
complete before an attempt to leave
+ * the consumer group is performed. In this context, “complete”
does not necessarily imply
+ * <em>success</em>; execution is “complete” even if the execution
<em>fails</em> with an error.
+ * </li>
+ * <li>
+ * Any error thrown during the {@link ConsumerRebalanceListener}
callback execution will be caught to
+ * ensure it does not prevent execution of the remaining {@link
#close()} logic.
+ * </li>
+ * <li>
+ * The application thread will be blocked during the entire
duration of the execution of the
+ * {@link ConsumerRebalanceListener}. The {@link #close()} method
does not employ a mechanism to
+ * short-circuit the callback execution.
+ * </li>
+ * <li>
+ * Since the {@link ConsumerRebalanceListener} APIs do not include
a timeout parameter, a given
+ * {@link ConsumerRebalanceListener} implementation cannot alter
its behavior to adhere to the timeout.
+ * </li>
+ * <li>
+ * A given {@link ConsumerRebalanceListener} implementation may be
affected by the application thread's
+ * interrupt state. If the callback implementation performs any
blocking operations, it may result in
+ * an error. An implementation may choose to preemptively check
the thread's interrupt flag via
+ * {@link Thread#isInterrupted()} or {@link
Thread#isInterrupted()} and alter its behavior.
+ * </li>
+ * <li>
+ * If the application thread was interrupted <em>prior</em> to the
execution of the
+ * {@link ConsumerRebalanceListener} callback, the thread's
interrupt state will be preserved for the
+ * {@link ConsumerRebalanceListener} execution.
+ * </li>
+ * <li>
+ * If the application thread was interrupted <em>prior</em> to the
execution of the
+ * {@link ConsumerRebalanceListener} callback <em>but</em> the
callback cleared out the interrupt state,
+ * the {@link #close()} method will not make any effort to restore
the application thread's interrupt
+ * state for the remainder of the execution of {@link #close()}.
+ * </li>
+ * <li>
+ * The consumer will attempt to leave the group on a “best-case”
basis. There is no stated guarantee
+ * that the consumer will have successfully left the group before
the {@link #close()} method
+ * completes processing.
+ * </li>
+ * <li>
+ * Leaving the consumer group is achieved by issuing a ‘leave
group‘ network request. This network I/O
+ * must be performed on the background thread.
Review Comment:
true but this is not specific to the close in any way right? (It's the same
for all api calls that issue requests). Would it be better/simpler to focus on
the tenets of the close itself?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]