This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 33d00224 [FLINK-36630][Connectors/Kafka] Wrap consumer.position in
retryOnWakeup (#133)
33d00224 is described below
commit 33d0022449601a69c20169268dcf6e1439830126
Author: trompa <[email protected]>
AuthorDate: Thu Mar 6 10:56:24 2025 +0100
[FLINK-36630][Connectors/Kafka] Wrap consumer.position in retryOnWakeup
(#133)
---
.../kafka/source/reader/KafkaPartitionSplitReader.java | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index 23956f5d..74d4bc1f 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -124,7 +124,7 @@ public class KafkaPartitionSplitReader
List<TopicPartition> finishedPartitions = new ArrayList<>();
for (TopicPartition tp : consumer.assignment()) {
long stoppingOffset = getStoppingOffset(tp);
- long consumerPosition = consumer.position(tp);
+ long consumerPosition = getConsumerPosition(tp, "retrieving
consumer position");
// Stop fetching when the consumer's position reaches the
stoppingOffset.
// Control messages may follow the last record; therefore, using
the last record's
// offset as a stopping condition could result in indefinite
blocking.
@@ -279,6 +279,10 @@ public class KafkaPartitionSplitReader
}
}
+ long getConsumerPosition(TopicPartition tp, String msg) {
+ return retryOnWakeup(() -> consumer.position(tp), msg);
+ }
+
private void parseStartingOffsets(
KafkaPartitionSplit split,
List<TopicPartition> partitionsStartingFromEarliest,
@@ -371,10 +375,9 @@ public class KafkaPartitionSplitReader
List<TopicPartition> emptyPartitions = new ArrayList<>();
// If none of the partitions have any records,
for (TopicPartition tp : consumer.assignment()) {
- if (retryOnWakeup(
- () -> consumer.position(tp),
- "getting starting offset to check if split is
empty")
- >= getStoppingOffset(tp)) {
+ long startingOffset =
+ getConsumerPosition(tp, "getting starting offset to check
if split is empty");
+ if (startingOffset >= getStoppingOffset(tp)) {
emptyPartitions.add(tp);
}
}
@@ -403,9 +406,8 @@ public class KafkaPartitionSplitReader
}
long startingOffset =
- retryOnWakeup(
- () ->
consumer.position(split.getTopicPartition()),
- "logging starting position");
+ getConsumerPosition(split.getTopicPartition(),
"logging starting position");
+
long stoppingOffset =
getStoppingOffset(split.getTopicPartition());
splitsInfo.add(
String.format(