This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new a60fe9301 [flink] Fluss enumerator partition discovery should be in
fixed delay rather than fixed rate. (#1633)
a60fe9301 is described below
commit a60fe930163c561c3ef86668442eacb81f35b520
Author: Hongshun Wang <[email protected]>
AuthorDate: Sat Sep 27 10:06:34 2025 +0800
[flink] Fluss enumerator partition discovery should be in fixed delay
rather than fixed rate. (#1633)
---
.../source/enumerator/FlinkSourceEnumerator.java | 40 ++++++---
.../flink/source/enumerator/WorkerExecutor.java | 94 ++++++++++++++++++++++
.../enumerator/FlinkSourceEnumeratorTest.java | 31 ++++---
.../flink/source/enumerator/MockWorkExecutor.java | 84 +++++++++++++++++++
4 files changed, 227 insertions(+), 22 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index de3731224..2138eca92 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -47,6 +47,7 @@ import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.utils.ExceptionUtils;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -87,11 +88,13 @@ import static
org.apache.fluss.utils.Preconditions.checkState;
* will be assigned to same reader.
* </ul>
*/
+@Internal
public class FlinkSourceEnumerator
implements SplitEnumerator<SourceSplitBase, SourceEnumeratorState> {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkSourceEnumerator.class);
+ private final WorkerExecutor workerExecutor;
private final TablePath tablePath;
private final boolean hasPrimaryKey;
private final boolean isPartitioned;
@@ -148,18 +151,22 @@ public class FlinkSourceEnumerator
OffsetsInitializer startingOffsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
boolean streaming,
- @Nullable Predicate partitionFilters) {
+ @Nullable Predicate partitionFilters,
+ @Nullable LakeSource<LakeSplit> lakeSource) {
this(
tablePath,
flussConf,
hasPrimaryKey,
isPartitioned,
context,
+ Collections.emptySet(),
+ Collections.emptyMap(),
+ null,
startingOffsetsInitializer,
scanPartitionDiscoveryIntervalMs,
streaming,
partitionFilters,
- null);
+ lakeSource);
}
public FlinkSourceEnumerator(
@@ -168,6 +175,9 @@ public class FlinkSourceEnumerator
boolean hasPrimaryKey,
boolean isPartitioned,
SplitEnumeratorContext<SourceSplitBase> context,
+ Set<TableBucket> assignedTableBuckets,
+ Map<Long, String> assignedPartitions,
+ List<SourceSplitBase> pendingHybridLakeFlussSplits,
OffsetsInitializer startingOffsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
boolean streaming,
@@ -179,17 +189,18 @@ public class FlinkSourceEnumerator
hasPrimaryKey,
isPartitioned,
context,
- Collections.emptySet(),
- Collections.emptyMap(),
- null,
+ assignedTableBuckets,
+ assignedPartitions,
+ pendingHybridLakeFlussSplits,
startingOffsetsInitializer,
scanPartitionDiscoveryIntervalMs,
streaming,
partitionFilters,
- lakeSource);
+ lakeSource,
+ new WorkerExecutor(context));
}
- public FlinkSourceEnumerator(
+ FlinkSourceEnumerator(
TablePath tablePath,
Configuration flussConf,
boolean hasPrimaryKey,
@@ -202,7 +213,8 @@ public class FlinkSourceEnumerator
long scanPartitionDiscoveryIntervalMs,
boolean streaming,
@Nullable Predicate partitionFilters,
- @Nullable LakeSource<LakeSplit> lakeSource) {
+ @Nullable LakeSource<LakeSplit> lakeSource,
+ WorkerExecutor workerExecutor) {
this.tablePath = checkNotNull(tablePath);
this.flussConf = checkNotNull(flussConf);
this.hasPrimaryKey = hasPrimaryKey;
@@ -222,6 +234,7 @@ public class FlinkSourceEnumerator
this.stoppingOffsetsInitializer =
streaming ? new NoStoppingOffsetsInitializer() :
OffsetsInitializer.latest();
this.lakeSource = lakeSource;
+ this.workerExecutor = workerExecutor;
}
@Override
@@ -257,8 +270,8 @@ public class FlinkSourceEnumerator
+ "with new partition discovery interval
of {} ms.",
tablePath,
scanPartitionDiscoveryIntervalMs);
- // discover new partitions and handle new partitions
- context.callAsync(
+ // discover new partitions and handle new partitions at
fixed delay.
+ workerExecutor.callAsyncAtFixedDelay(
this::listPartitions,
this::checkPartitionChanges,
0,
@@ -268,7 +281,7 @@ public class FlinkSourceEnumerator
LOG.info(
"Starting the FlussSourceEnumerator for table {}
without partition discovery.",
tablePath);
- context.callAsync(this::listPartitions,
this::checkPartitionChanges);
+ workerExecutor.callAsync(this::listPartitions,
this::checkPartitionChanges);
}
} else {
startInBatchMode();
@@ -434,7 +447,7 @@ public class FlinkSourceEnumerator
partitionChange.newPartitions.size(),
tablePath,
partitionChange.newPartitions);
- context.callAsync(
+ workerExecutor.callAsync(
() -> initPartitionedSplits(partitionChange.newPartitions),
this::handleSplitsAdd);
}
@@ -872,6 +885,9 @@ public class FlinkSourceEnumerator
public void close() throws IOException {
try {
closed = true;
+ if (workerExecutor != null) {
+ workerExecutor.close();
+ }
if (flussAdmin != null) {
flussAdmin.close();
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/WorkerExecutor.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/WorkerExecutor.java
new file mode 100644
index 000000000..8147ac8b3
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/WorkerExecutor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.enumerator;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.flink.source.split.SourceSplitBase;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+/**
+ * A worker executor that extends {@link SplitEnumeratorContext} with fixed
delay scheduling
+ * capabilities for asynchronous tasks.
+ *
+ * <p>{@link SplitEnumeratorContext} natively only supports fixed rate
scheduling for asynchronous
+ * calls, which can lead to task accumulation if individual calls take too
long to complete.
+ *
+ * <p>This executor wraps a single-threaded {@link ScheduledExecutorService}
to handle async
+ * operations and route their results back to the coordinator thread through
the {@link
+ * SplitEnumeratorContext#callAsync} methods.
+ *
+ * <p>TODO: This class is a workaround and should be removed once FLINK-38335
is completed.
+ */
+@Internal
+public class WorkerExecutor implements AutoCloseable {
+ protected final SplitEnumeratorContext<SourceSplitBase> context;
+ private final ScheduledExecutorService scheduledExecutor;
+
+ public WorkerExecutor(SplitEnumeratorContext<SourceSplitBase> context) {
+ this.context = context;
+ this.scheduledExecutor =
+ Executors.newScheduledThreadPool(
+ 1, new
ExecutorThreadFactory("SplitEnumeratorContextWrapper"));
+ }
+
+ public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable>
handler) {
+ context.callAsync(callable, handler);
+ }
+
+ public <T> void callAsyncAtFixedDelay(
+ Callable<T> callable, BiConsumer<T, Throwable> handler, long
initialDelay, long delay) {
+ scheduledExecutor.scheduleWithFixedDelay(
+ () -> {
+ final CountDownLatch latch = new CountDownLatch(1);
+ context.callAsync(
+ () -> {
+ try {
+ return callable.call();
+ } finally {
+ latch.countDown();
+ }
+ },
+ handler);
+ // wait for the call to complete
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new FlussRuntimeException(
+ "Interrupted while waiting for async call to
complete", e);
+ }
+ },
+ initialDelay,
+ delay,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void close() throws Exception {
+ scheduledExecutor.shutdownNow();
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index 5dc4af470..20196bc81 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -97,6 +97,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
OffsetsInitializer.full(),
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
streaming,
+ null,
null);
enumerator.start();
@@ -144,6 +145,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
OffsetsInitializer.full(),
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
streaming,
+ null,
null);
enumerator.start();
// register all read
@@ -215,6 +217,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
OffsetsInitializer.full(),
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
streaming,
+ null,
null);
enumerator.start();
@@ -261,6 +264,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
OffsetsInitializer.full(),
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
streaming,
+ null,
null);
enumerator.start();
@@ -297,6 +301,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
OffsetsInitializer.full(),
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
streaming,
+ null,
null);
enumerator.start();
@@ -391,6 +396,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
ZooKeeperClient zooKeeperClient =
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
try (MockSplitEnumeratorContext<SourceSplitBase> context =
new MockSplitEnumeratorContext<>(numSubtasks);
+ MockWorkExecutor workExecutor = new MockWorkExecutor(context);
FlinkSourceEnumerator enumerator =
new FlinkSourceEnumerator(
DEFAULT_TABLE_PATH,
@@ -398,23 +404,28 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
isPrimaryKeyTable,
true,
context,
+ Collections.emptySet(),
+ Collections.emptyMap(),
+ null,
OffsetsInitializer.full(),
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
streaming,
- null)) {
+ null,
+ null,
+ workExecutor)) {
Map<Long, String> partitionNameByIds =
waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH);
enumerator.start();
// invoke partition discovery callable again and there should be
pending assignments.
- runPeriodicPartitionDiscovery(context);
+ runPeriodicPartitionDiscovery(workExecutor);
// register two readers
registerReader(context, enumerator, 0);
registerReader(context, enumerator, 1);
// invoke partition discovery callable again, shouldn't produce
RemovePartitionEvent.
- runPeriodicPartitionDiscovery(context);
+ runPeriodicPartitionDiscovery(workExecutor);
assertThat(context.getSentSourceEvent()).isEmpty();
// now, register the third reader
@@ -434,7 +445,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH,
newPartitions);
/// invoke partition discovery callable again and there should
assignments.
- runPeriodicPartitionDiscovery(context);
+ runPeriodicPartitionDiscovery(workExecutor);
expectedAssignment = expectAssignments(enumerator, tableId,
newPartitionNameIds);
actualAssignments = getLastReadersAssignments(context);
@@ -450,7 +461,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH,
newPartitions);
// invoke partition discovery callable again
- runPeriodicPartitionDiscovery(context);
+ runPeriodicPartitionDiscovery(workExecutor);
// there should be partition removed events
Map<Integer, List<SourceEvent>> sentSourceEvents =
context.getSentSourceEvent();
@@ -516,6 +527,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
OffsetsInitializer.full(),
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
streaming,
+ null,
null)) {
// test splits for same non-partitioned bucket, should assign to
same task
@@ -587,13 +599,12 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
}
}
- private void
runPeriodicPartitionDiscovery(MockSplitEnumeratorContext<SourceSplitBase>
context)
- throws Throwable {
+ private void runPeriodicPartitionDiscovery(MockWorkExecutor workExecutor)
throws Throwable {
// Fetch potential topic descriptions
- context.runPeriodicCallable(PARTITION_DISCOVERY_CALLABLE_INDEX);
+ workExecutor.runPeriodicCallable(PARTITION_DISCOVERY_CALLABLE_INDEX);
// Initialize offsets for discovered partitions
- if (!context.getOneTimeCallables().isEmpty()) {
- context.runNextOneTimeCallable();
+ if (!workExecutor.getOneTimeCallables().isEmpty()) {
+ workExecutor.runNextOneTimeCallable();
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/MockWorkExecutor.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/MockWorkExecutor.java
new file mode 100644
index 000000000..052e85a06
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/MockWorkExecutor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.enumerator;
+
+import org.apache.fluss.flink.source.split.SourceSplitBase;
+
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+
+/** A mock implementation of WorkerExecutor which separate task submit and
run. */
+public class MockWorkExecutor extends WorkerExecutor implements AutoCloseable {
+ private final List<Callable<?>> periodicCallables;
+
+ public MockWorkExecutor(MockSplitEnumeratorContext<SourceSplitBase>
context) {
+ super(context);
+ this.periodicCallables = new ArrayList<>();
+ }
+
+ @Override
+ public <T> void callAsyncAtFixedDelay(
+ Callable<T> callable, BiConsumer<T, Throwable> handler, long
initialDelay, long delay) {
+ periodicCallables.add(
+ () -> {
+ CountDownLatch latch = new CountDownLatch(1);
+ context.callAsync(
+ () -> {
+ try {
+ return callable.call();
+ } finally {
+ latch.countDown();
+ }
+ },
+ handler);
+ try {
+ ((MockSplitEnumeratorContext<?>)
context).runNextOneTimeCallable();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ // wait for the call to complete
+ latch.await();
+ return null;
+ });
+ }
+
+ public void runPeriodicCallable(int index) throws Throwable {
+ periodicCallables.get(index).call();
+ }
+
+ public void runNextOneTimeCallable() throws Throwable {
+ ((MockSplitEnumeratorContext<?>) context).runNextOneTimeCallable();
+ }
+
+ public BlockingQueue<Callable<Future<?>>> getOneTimeCallables() {
+ return ((MockSplitEnumeratorContext<?>) context).getOneTimeCallables();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.periodicCallables.clear();
+ ((MockSplitEnumeratorContext<?>) context).close();
+ }
+}