This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 33d00224 [FLINK-36630][Connectors/Kafka] Wrap consumer.position in 
retryOnWakeup (#133)
33d00224 is described below

commit 33d0022449601a69c20169268dcf6e1439830126
Author: trompa <[email protected]>
AuthorDate: Thu Mar 6 10:56:24 2025 +0100

    [FLINK-36630][Connectors/Kafka] Wrap consumer.position in retryOnWakeup 
(#133)
---
 .../kafka/source/reader/KafkaPartitionSplitReader.java | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index 23956f5d..74d4bc1f 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -124,7 +124,7 @@ public class KafkaPartitionSplitReader
         List<TopicPartition> finishedPartitions = new ArrayList<>();
         for (TopicPartition tp : consumer.assignment()) {
             long stoppingOffset = getStoppingOffset(tp);
-            long consumerPosition = consumer.position(tp);
+            long consumerPosition = getConsumerPosition(tp, "retrieving 
consumer position");
             // Stop fetching when the consumer's position reaches the 
stoppingOffset.
             // Control messages may follow the last record; therefore, using 
the last record's
             // offset as a stopping condition could result in indefinite 
blocking.
@@ -279,6 +279,10 @@ public class KafkaPartitionSplitReader
         }
     }
 
+    long getConsumerPosition(TopicPartition tp, String msg) {
+        return retryOnWakeup(() -> consumer.position(tp), msg);
+    }
+
     private void parseStartingOffsets(
             KafkaPartitionSplit split,
             List<TopicPartition> partitionsStartingFromEarliest,
@@ -371,10 +375,9 @@ public class KafkaPartitionSplitReader
         List<TopicPartition> emptyPartitions = new ArrayList<>();
         // If none of the partitions have any records,
         for (TopicPartition tp : consumer.assignment()) {
-            if (retryOnWakeup(
-                            () -> consumer.position(tp),
-                            "getting starting offset to check if split is 
empty")
-                    >= getStoppingOffset(tp)) {
+            long startingOffset =
+                    getConsumerPosition(tp, "getting starting offset to check 
if split is empty");
+            if (startingOffset >= getStoppingOffset(tp)) {
                 emptyPartitions.add(tp);
             }
         }
@@ -403,9 +406,8 @@ public class KafkaPartitionSplitReader
                 }
 
                 long startingOffset =
-                        retryOnWakeup(
-                                () -> 
consumer.position(split.getTopicPartition()),
-                                "logging starting position");
+                        getConsumerPosition(split.getTopicPartition(), 
"logging starting position");
+
                 long stoppingOffset = 
getStoppingOffset(split.getTopicPartition());
                 splitsInfo.add(
                         String.format(

Reply via email to