This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new c5ca56ba2 [bugfix] FlinkSourceSplitReader sends
PartitionBucketsUnsubscribedEvent to FlinkSourceEnumerator when subscribing to
a removed partition (#1220) (#1248)
c5ca56ba2 is described below
commit c5ca56ba2c8fc69102820d4049fe45ee6967fb30
Author: Yang Wang <[email protected]>
AuthorDate: Thu Jul 24 14:21:17 2025 +0800
[bugfix] FlinkSourceSplitReader sends PartitionBucketsUnsubscribedEvent to
FlinkSourceEnumerator when subscribing to a removed partition (#1220) (#1248)
Co-authored-by: ocean.wy <[email protected]>
---
.../fluss/flink/source/reader/FlinkSourceSplitReader.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
index dbc5e18c2..55c57c79c 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -108,6 +108,8 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
private final Set<String> emptyLogSplits;
// track split IDs corresponding to removed partitions
private final Set<String> removedSplits = new HashSet<>();
+ // Set to collect table buckets that are unsubscribed.
+ private Set<TableBucket> unsubscribedTableBuckets = new HashSet<>();
public FlinkSourceSplitReader(
Configuration flussConf,
@@ -267,6 +269,8 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
if (partitionNotExist) {
// mark the not exist partition to be removed
removedSplits.add(split.splitId());
+ // mark the table bucket to be unsubscribed
+ unsubscribedTableBuckets.add(tableBucket);
LOG.warn(
"Partition {} does not exist when subscribing
to log for split {}. Skipping subscription.",
partitionId,
@@ -290,8 +294,6 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
}
public Set<TableBucket> removePartitions(Map<Long, String>
removedPartitions) {
- // Set to collect table buckets that are unsubscribed.
- Set<TableBucket> unsubscribedTableBuckets = new HashSet<>();
// First, if the current active bounded split belongs to a removed
partition,
// finish it so it will not be restored.
if (currentBoundedSplit != null) {
@@ -352,7 +354,9 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
}
}
- return unsubscribedTableBuckets;
+ Set<TableBucket> currentUnsubscribedTableBuckets =
this.unsubscribedTableBuckets;
+ this.unsubscribedTableBuckets = new HashSet<>();
+ return currentUnsubscribedTableBuckets;
}
private void checkSnapshotSplitOrStartNext() {