kirktrue commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1823645974
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1813,36 +1832,64 @@ private boolean processBackgroundEvents() {
* execution of the rebalancing logic. The rebalancing logic cannot
complete until the
* {@link ConsumerRebalanceListener} callback is performed.
*
- * @param future Event that contains a {@link
CompletableFuture}; it is on this future that the
- * application thread will wait for
completion
- * @param timer Overall timer that bounds how long to
wait for the event to complete
- * @param ignoreErrorEventException Predicate to ignore background errors.
- * Any exceptions found while processing
background events that match the predicate won't be propagated.
- * @return {@code true} if the event completed within the timeout, {@code
false} otherwise
+ * <p/>
+ *
+ * There is a conflict between the needs of the {@link
ConsumerRebalanceListener} and internal event processing
+ * when it comes to handling the current thread's interrupt state. To
maintain compatibility with the
+ * {@link ClassicKafkaConsumer}'s handling of rebalance listeners, the
interrupt state for the current thread
+ * will be preserved when invoking callbacks. However, because of the
internal use of {@link Future#get()} to
+ * wait for event responses, the current thread cannot exist in an
interrupted state. The flag is cleared before
+ * handling events so that calls to {@link Future#get()} do not
immediately throw {@link TimeoutException}s.
+ * This method will conditionally set the current thread's interrupted
flag prior to processing background events
+ * so that if there are any rebalance listeners, the interrupt state will
be preserved. Immediately after
+ * processing the background events, the thread's interrupted flag is
cleared.
+ *
+ * @param future Future from {@link UnsubscribeEvent}
+ * @param wasInterrupted {@code true} if the current thread was previously
interrupted, {@code false} otherwise
+ * @param timer Timer which constrains the runtime of the operation
*/
- // Visible for testing
- <T> T processBackgroundEvents(Future<T> future, Timer timer,
Predicate<Exception> ignoreErrorEventException) {
+ void waitForUnsubscribe(final CompletableFuture<?> future, final boolean
wasInterrupted, final Timer timer) {
+ // At this point, the unsubscribe process is on its way. The
application thread has no direct way of knowing
+ // where the background thread is in its journey of unsubscribing,
hence this loop...
do {
+ // Depending on a number of variables (described in the method
comments), a
+ // ConsumerRebalanceListenerCallbackNeededEvent may or may not
appear in the background event queue.
+ // So there's really no choice but to process any events in the
queue in case that event is waiting for
+ // the application thread to pick up and invoke the callback
handler.
boolean hadEvents = false;
+
try {
- hadEvents = processBackgroundEvents();
- } catch (Exception e) {
- if (!ignoreErrorEventException.test(e))
+ if (wasInterrupted)
+ Thread.currentThread().interrupt();
+
+ try {
+ hadEvents = processBackgroundEvents();
+ } catch (InvalidTopicException e) {
+ // If users subscribe to an invalid topic name, they will
get InvalidTopicException in error events,
+ // because network thread keeps trying to send
MetadataRequest in the background.
+ // Ignore it to avoid unsubscribe failed.
+ } catch (Exception e) {
Review Comment:
Yes. Thanks for catching that. I removed it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]