leonardBang commented on code in PR #19828:
URL: https://github.com/apache/flink/pull/19828#discussion_r897508261


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -398,6 +400,32 @@ private void maybeRegisterKafkaConsumerMetrics(
         }
     }
 
+    /**
+     * Catch {@link WakeupException} in Kafka consumer call and retry the 
invocation on exception.
+     *
+     * <p>This helper function handles a race condition as below:
+     *
+     * <ol>
+     *   <li>Fetcher thread finishes a {@link KafkaConsumer#poll(Duration)} 
call
+     *   <li>Task thread assigns new splits so invokes {@link #wakeUp()}, then 
the wakeup is
+     *       recorded and held by the consumer
+     *   <li>Later fetcher thread invokes {@link 
#handleSplitsChanges(SplitsChange)}, and
+     *       interactions with consumer will throw {@link WakeupException} 
because of the previously
+     *       held wakeup in the consumer
+     * </ol>
+     *
+     * <p>Under this case we need to catch the {@link WakeupException} and 
retry the operation.
+     */
+    private <V> V retryOnWakeup(Supplier<V> consumerCall) {
+        while (true) {
+            try {
+                return consumerCall.get();
+            } catch (WakeupException we) {
+                // Do nothing here and the loop will retry the consumer call.

Review Comment:
   please add log here which is helpful if we can not leave the loop



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to