This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new a60fe9301 [flink] Fluss enumerator partition discovery should be in 
fixed delay rather than fixed rate. (#1633)
a60fe9301 is described below

commit a60fe930163c561c3ef86668442eacb81f35b520
Author: Hongshun Wang <[email protected]>
AuthorDate: Sat Sep 27 10:06:34 2025 +0800

    [flink] Fluss enumerator partition discovery should be in fixed delay 
rather than fixed rate. (#1633)
---
 .../source/enumerator/FlinkSourceEnumerator.java   | 40 ++++++---
 .../flink/source/enumerator/WorkerExecutor.java    | 94 ++++++++++++++++++++++
 .../enumerator/FlinkSourceEnumeratorTest.java      | 31 ++++---
 .../flink/source/enumerator/MockWorkExecutor.java  | 84 +++++++++++++++++++
 4 files changed, 227 insertions(+), 22 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index de3731224..2138eca92 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -47,6 +47,7 @@ import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.utils.ExceptionUtils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -87,11 +88,13 @@ import static 
org.apache.fluss.utils.Preconditions.checkState;
  *       will be assigned to same reader.
  * </ul>
  */
+@Internal
 public class FlinkSourceEnumerator
         implements SplitEnumerator<SourceSplitBase, SourceEnumeratorState> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSourceEnumerator.class);
 
+    private final WorkerExecutor workerExecutor;
     private final TablePath tablePath;
     private final boolean hasPrimaryKey;
     private final boolean isPartitioned;
@@ -148,18 +151,22 @@ public class FlinkSourceEnumerator
             OffsetsInitializer startingOffsetsInitializer,
             long scanPartitionDiscoveryIntervalMs,
             boolean streaming,
-            @Nullable Predicate partitionFilters) {
+            @Nullable Predicate partitionFilters,
+            @Nullable LakeSource<LakeSplit> lakeSource) {
         this(
                 tablePath,
                 flussConf,
                 hasPrimaryKey,
                 isPartitioned,
                 context,
+                Collections.emptySet(),
+                Collections.emptyMap(),
+                null,
                 startingOffsetsInitializer,
                 scanPartitionDiscoveryIntervalMs,
                 streaming,
                 partitionFilters,
-                null);
+                lakeSource);
     }
 
     public FlinkSourceEnumerator(
@@ -168,6 +175,9 @@ public class FlinkSourceEnumerator
             boolean hasPrimaryKey,
             boolean isPartitioned,
             SplitEnumeratorContext<SourceSplitBase> context,
+            Set<TableBucket> assignedTableBuckets,
+            Map<Long, String> assignedPartitions,
+            List<SourceSplitBase> pendingHybridLakeFlussSplits,
             OffsetsInitializer startingOffsetsInitializer,
             long scanPartitionDiscoveryIntervalMs,
             boolean streaming,
@@ -179,17 +189,18 @@ public class FlinkSourceEnumerator
                 hasPrimaryKey,
                 isPartitioned,
                 context,
-                Collections.emptySet(),
-                Collections.emptyMap(),
-                null,
+                assignedTableBuckets,
+                assignedPartitions,
+                pendingHybridLakeFlussSplits,
                 startingOffsetsInitializer,
                 scanPartitionDiscoveryIntervalMs,
                 streaming,
                 partitionFilters,
-                lakeSource);
+                lakeSource,
+                new WorkerExecutor(context));
     }
 
-    public FlinkSourceEnumerator(
+    FlinkSourceEnumerator(
             TablePath tablePath,
             Configuration flussConf,
             boolean hasPrimaryKey,
@@ -202,7 +213,8 @@ public class FlinkSourceEnumerator
             long scanPartitionDiscoveryIntervalMs,
             boolean streaming,
             @Nullable Predicate partitionFilters,
-            @Nullable LakeSource<LakeSplit> lakeSource) {
+            @Nullable LakeSource<LakeSplit> lakeSource,
+            WorkerExecutor workerExecutor) {
         this.tablePath = checkNotNull(tablePath);
         this.flussConf = checkNotNull(flussConf);
         this.hasPrimaryKey = hasPrimaryKey;
@@ -222,6 +234,7 @@ public class FlinkSourceEnumerator
         this.stoppingOffsetsInitializer =
                 streaming ? new NoStoppingOffsetsInitializer() : 
OffsetsInitializer.latest();
         this.lakeSource = lakeSource;
+        this.workerExecutor = workerExecutor;
     }
 
     @Override
@@ -257,8 +270,8 @@ public class FlinkSourceEnumerator
                                     + "with new partition discovery interval 
of {} ms.",
                             tablePath,
                             scanPartitionDiscoveryIntervalMs);
-                    // discover new partitions and handle new partitions
-                    context.callAsync(
+                    // discover new partitions and handle new partitions at 
fixed delay.
+                    workerExecutor.callAsyncAtFixedDelay(
                             this::listPartitions,
                             this::checkPartitionChanges,
                             0,
@@ -268,7 +281,7 @@ public class FlinkSourceEnumerator
                     LOG.info(
                             "Starting the FlussSourceEnumerator for table {} 
without partition discovery.",
                             tablePath);
-                    context.callAsync(this::listPartitions, 
this::checkPartitionChanges);
+                    workerExecutor.callAsync(this::listPartitions, 
this::checkPartitionChanges);
                 }
             } else {
                 startInBatchMode();
@@ -434,7 +447,7 @@ public class FlinkSourceEnumerator
                     partitionChange.newPartitions.size(),
                     tablePath,
                     partitionChange.newPartitions);
-            context.callAsync(
+            workerExecutor.callAsync(
                     () -> initPartitionedSplits(partitionChange.newPartitions),
                     this::handleSplitsAdd);
         }
@@ -872,6 +885,9 @@ public class FlinkSourceEnumerator
     public void close() throws IOException {
         try {
             closed = true;
+            if (workerExecutor != null) {
+                workerExecutor.close();
+            }
             if (flussAdmin != null) {
                 flussAdmin.close();
             }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/WorkerExecutor.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/WorkerExecutor.java
new file mode 100644
index 000000000..8147ac8b3
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/WorkerExecutor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.enumerator;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.flink.source.split.SourceSplitBase;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+/**
+ * A worker executor that extends {@link SplitEnumeratorContext} with fixed 
delay scheduling
+ * capabilities for asynchronous tasks.
+ *
+ * <p>{@link SplitEnumeratorContext} natively only supports fixed rate 
scheduling for asynchronous
+ * calls, which can lead to task accumulation if individual calls take too 
long to complete.
+ *
+ * <p>This executor wraps a single-threaded {@link ScheduledExecutorService} 
to handle async
+ * operations and route their results back to the coordinator thread through 
the {@link
+ * SplitEnumeratorContext#callAsync} methods.
+ *
+ * <p>TODO: This class is a workaround and should be removed once FLINK-38335 
is completed.
+ */
+@Internal
+public class WorkerExecutor implements AutoCloseable {
+    protected final SplitEnumeratorContext<SourceSplitBase> context;
+    private final ScheduledExecutorService scheduledExecutor;
+
+    public WorkerExecutor(SplitEnumeratorContext<SourceSplitBase> context) {
+        this.context = context;
+        this.scheduledExecutor =
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("SplitEnumeratorContextWrapper"));
+    }
+
+    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> 
handler) {
+        context.callAsync(callable, handler);
+    }
+
+    public <T> void callAsyncAtFixedDelay(
+            Callable<T> callable, BiConsumer<T, Throwable> handler, long 
initialDelay, long delay) {
+        scheduledExecutor.scheduleWithFixedDelay(
+                () -> {
+                    final CountDownLatch latch = new CountDownLatch(1);
+                    context.callAsync(
+                            () -> {
+                                try {
+                                    return callable.call();
+                                } finally {
+                                    latch.countDown();
+                                }
+                            },
+                            handler);
+                    // wait for the call to complete
+                    try {
+                        latch.await();
+                    } catch (InterruptedException e) {
+                        throw new FlussRuntimeException(
+                                "Interrupted while waiting for async call to 
complete", e);
+                    }
+                },
+                initialDelay,
+                delay,
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() throws Exception {
+        scheduledExecutor.shutdownNow();
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index 5dc4af470..20196bc81 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -97,6 +97,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             OffsetsInitializer.full(),
                             DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                             streaming,
+                            null,
                             null);
 
             enumerator.start();
@@ -144,6 +145,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             OffsetsInitializer.full(),
                             DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                             streaming,
+                            null,
                             null);
             enumerator.start();
             // register all read
@@ -215,6 +217,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             OffsetsInitializer.full(),
                             DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                             streaming,
+                            null,
                             null);
 
             enumerator.start();
@@ -261,6 +264,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             OffsetsInitializer.full(),
                             DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                             streaming,
+                            null,
                             null);
 
             enumerator.start();
@@ -297,6 +301,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             OffsetsInitializer.full(),
                             DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                             streaming,
+                            null,
                             null);
 
             enumerator.start();
@@ -391,6 +396,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
         ZooKeeperClient zooKeeperClient = 
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
         try (MockSplitEnumeratorContext<SourceSplitBase> context =
                         new MockSplitEnumeratorContext<>(numSubtasks);
+                MockWorkExecutor workExecutor = new MockWorkExecutor(context);
                 FlinkSourceEnumerator enumerator =
                         new FlinkSourceEnumerator(
                                 DEFAULT_TABLE_PATH,
@@ -398,23 +404,28 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                                 isPrimaryKeyTable,
                                 true,
                                 context,
+                                Collections.emptySet(),
+                                Collections.emptyMap(),
+                                null,
                                 OffsetsInitializer.full(),
                                 DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                                 streaming,
-                                null)) {
+                                null,
+                                null,
+                                workExecutor)) {
             Map<Long, String> partitionNameByIds =
                     waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH);
             enumerator.start();
 
             // invoke partition discovery callable again and there should be 
pending assignments.
-            runPeriodicPartitionDiscovery(context);
+            runPeriodicPartitionDiscovery(workExecutor);
 
             // register two readers
             registerReader(context, enumerator, 0);
             registerReader(context, enumerator, 1);
 
             // invoke partition discovery callable again, shouldn't produce 
RemovePartitionEvent.
-            runPeriodicPartitionDiscovery(context);
+            runPeriodicPartitionDiscovery(workExecutor);
             assertThat(context.getSentSourceEvent()).isEmpty();
 
             // now, register the third reader
@@ -434,7 +445,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                     createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, 
newPartitions);
 
             /// invoke partition discovery callable again and there should 
assignments.
-            runPeriodicPartitionDiscovery(context);
+            runPeriodicPartitionDiscovery(workExecutor);
 
             expectedAssignment = expectAssignments(enumerator, tableId, 
newPartitionNameIds);
             actualAssignments = getLastReadersAssignments(context);
@@ -450,7 +461,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                     createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, 
newPartitions);
 
             // invoke partition discovery callable again
-            runPeriodicPartitionDiscovery(context);
+            runPeriodicPartitionDiscovery(workExecutor);
 
             // there should be partition removed events
             Map<Integer, List<SourceEvent>> sentSourceEvents = 
context.getSentSourceEvent();
@@ -516,6 +527,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                                 OffsetsInitializer.full(),
                                 DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                                 streaming,
+                                null,
                                 null)) {
 
             // test splits for same non-partitioned bucket, should assign to 
same task
@@ -587,13 +599,12 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
         }
     }
 
-    private void 
runPeriodicPartitionDiscovery(MockSplitEnumeratorContext<SourceSplitBase> 
context)
-            throws Throwable {
+    private void runPeriodicPartitionDiscovery(MockWorkExecutor workExecutor) 
throws Throwable {
         // Fetch potential topic descriptions
-        context.runPeriodicCallable(PARTITION_DISCOVERY_CALLABLE_INDEX);
+        workExecutor.runPeriodicCallable(PARTITION_DISCOVERY_CALLABLE_INDEX);
         // Initialize offsets for discovered partitions
-        if (!context.getOneTimeCallables().isEmpty()) {
-            context.runNextOneTimeCallable();
+        if (!workExecutor.getOneTimeCallables().isEmpty()) {
+            workExecutor.runNextOneTimeCallable();
         }
     }
 
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/MockWorkExecutor.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/MockWorkExecutor.java
new file mode 100644
index 000000000..052e85a06
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/MockWorkExecutor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.enumerator;
+
+import org.apache.fluss.flink.source.split.SourceSplitBase;
+
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+
+/** A mock implementation of WorkerExecutor which separate task submit and 
run. */
+public class MockWorkExecutor extends WorkerExecutor implements AutoCloseable {
+    private final List<Callable<?>> periodicCallables;
+
+    public MockWorkExecutor(MockSplitEnumeratorContext<SourceSplitBase> 
context) {
+        super(context);
+        this.periodicCallables = new ArrayList<>();
+    }
+
+    @Override
+    public <T> void callAsyncAtFixedDelay(
+            Callable<T> callable, BiConsumer<T, Throwable> handler, long 
initialDelay, long delay) {
+        periodicCallables.add(
+                () -> {
+                    CountDownLatch latch = new CountDownLatch(1);
+                    context.callAsync(
+                            () -> {
+                                try {
+                                    return callable.call();
+                                } finally {
+                                    latch.countDown();
+                                }
+                            },
+                            handler);
+                    try {
+                        ((MockSplitEnumeratorContext<?>) 
context).runNextOneTimeCallable();
+                    } catch (Throwable e) {
+                        throw new RuntimeException(e);
+                    }
+                    // wait for the call to complete
+                    latch.await();
+                    return null;
+                });
+    }
+
+    public void runPeriodicCallable(int index) throws Throwable {
+        periodicCallables.get(index).call();
+    }
+
+    public void runNextOneTimeCallable() throws Throwable {
+        ((MockSplitEnumeratorContext<?>) context).runNextOneTimeCallable();
+    }
+
+    public BlockingQueue<Callable<Future<?>>> getOneTimeCallables() {
+        return ((MockSplitEnumeratorContext<?>) context).getOneTimeCallables();
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.periodicCallables.clear();
+        ((MockSplitEnumeratorContext<?>) context).close();
+    }
+}

Reply via email to