leonardBang commented on code in PR #19828: URL: https://github.com/apache/flink/pull/19828#discussion_r897508261
########## flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java: ########## @@ -398,6 +400,32 @@ private void maybeRegisterKafkaConsumerMetrics( } } + /** + * Catch {@link WakeupException} in Kafka consumer call and retry the invocation on exception. + * + * <p>This helper function handles a race condition as below: + * + * <ol> + * <li>Fetcher thread finishes a {@link KafkaConsumer#poll(Duration)} call + * <li>Task thread assigns new splits so invokes {@link #wakeUp()}, then the wakeup is + * recorded and held by the consumer + * <li>Later fetcher thread invokes {@link #handleSplitsChanges(SplitsChange)}, and + * interactions with consumer will throw {@link WakeupException} because of the previously + * held wakeup in the consumer + * </ol> + * + * <p>Under this case we need to catch the {@link WakeupException} and retry the operation. + */ + private <V> V retryOnWakeup(Supplier<V> consumerCall) { + while (true) { + try { + return consumerCall.get(); + } catch (WakeupException we) { + // Do nothing here and the loop will retry the consumer call. Review Comment: please add log here which is helpful if we can not leave the loop -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org