This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8264bb2a9c [flink] fix that AppendBypassCoordinateOperator does not
generate compaction tasks. (#5897)
8264bb2a9c is described below
commit 8264bb2a9ce25d9f868790d1189d5eb9a073705a
Author: zhoulii <[email protected]>
AuthorDate: Tue Jul 15 14:25:22 2025 +0800
[flink] fix that AppendBypassCoordinateOperator does not generate
compaction tasks. (#5897)
---
.../flink/source/AppendBypassCoordinateOperator.java | 19 ++++++++++++++++---
1 file changed, 16 insertions(+), 3 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
index 89bba8d87f..3fd3e305de 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
@@ -23,6 +23,7 @@ import org.apache.paimon.append.AppendCompactTask;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ExecutorUtils;
+import org.apache.paimon.utils.Preconditions;
import
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -56,6 +57,7 @@ public class AppendBypassCoordinateOperator<CommitT>
private transient ScheduledExecutorService executorService;
private transient LinkedBlockingQueue<AppendCompactTask> compactTasks;
+ private Throwable throwable;
public AppendBypassCoordinateOperator(
StreamOperatorParameters<Either<CommitT, AppendCompactTask>>
parameters,
@@ -85,9 +87,15 @@ public class AppendBypassCoordinateOperator<CommitT>
private void asyncPlan(AppendCompactCoordinator coordinator) {
while (compactTasks.size() < MAX_PENDING_TASKS) {
- List<AppendCompactTask> tasks = coordinator.run();
- compactTasks.addAll(tasks);
- if (tasks.isEmpty()) {
+ try {
+ List<AppendCompactTask> tasks = coordinator.run();
+ compactTasks.addAll(tasks);
+ if (tasks.isEmpty()) {
+ break;
+ }
+ } catch (Throwable t) {
+ LOG.error("Fatal exception happened when generating compaction
tasks.", t);
+ this.throwable = t;
break;
}
}
@@ -106,6 +114,11 @@ public class AppendBypassCoordinateOperator<CommitT>
@Override
public void processElement(StreamRecord<CommitT> record) throws Exception {
+ Preconditions.checkState(
+ throwable == null,
+ String.format(
+ "Fatal exception happened when generating compaction
tasks: %s",
+ throwable));
output.collect(new StreamRecord<>(Either.Left(record.getValue())));
}