This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new c5ca56ba2 [bugfix] FlinkSourceSplitReader sends 
PartitionBucketsUnsubscribedEvent to FlinkSourceEnumerator when subscribing to 
a removed partition (#1220) (#1248)
c5ca56ba2 is described below

commit c5ca56ba2c8fc69102820d4049fe45ee6967fb30
Author: Yang Wang <[email protected]>
AuthorDate: Thu Jul 24 14:21:17 2025 +0800

    [bugfix] FlinkSourceSplitReader sends PartitionBucketsUnsubscribedEvent to 
FlinkSourceEnumerator when subscribing to a removed partition (#1220) (#1248)
    
    Co-authored-by: ocean.wy <[email protected]>
---
 .../fluss/flink/source/reader/FlinkSourceSplitReader.java      | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
index dbc5e18c2..55c57c79c 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -108,6 +108,8 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
     private final Set<String> emptyLogSplits;
     // track split IDs corresponding to removed partitions
     private final Set<String> removedSplits = new HashSet<>();
+    // Set to collect table buckets that are unsubscribed.
+    private Set<TableBucket> unsubscribedTableBuckets = new HashSet<>();
 
     public FlinkSourceSplitReader(
             Configuration flussConf,
@@ -267,6 +269,8 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
                     if (partitionNotExist) {
                         // mark the not exist partition to be removed
                         removedSplits.add(split.splitId());
+                        // mark the table bucket to be unsubscribed
+                        unsubscribedTableBuckets.add(tableBucket);
                         LOG.warn(
                                 "Partition {} does not exist when subscribing 
to log for split {}. Skipping subscription.",
                                 partitionId,
@@ -290,8 +294,6 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
     }
 
     public Set<TableBucket> removePartitions(Map<Long, String> 
removedPartitions) {
-        // Set to collect table buckets that are unsubscribed.
-        Set<TableBucket> unsubscribedTableBuckets = new HashSet<>();
         // First, if the current active bounded split belongs to a removed 
partition,
         // finish it so it will not be restored.
         if (currentBoundedSplit != null) {
@@ -352,7 +354,9 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
             }
         }
 
-        return unsubscribedTableBuckets;
+        Set<TableBucket> currentUnsubscribedTableBuckets = 
this.unsubscribedTableBuckets;
+        this.unsubscribedTableBuckets = new HashSet<>();
+        return currentUnsubscribedTableBuckets;
     }
 
     private void checkSnapshotSplitOrStartNext() {

Reply via email to