This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8264bb2a9c [flink] fix that AppendBypassCoordinateOperator does not 
generate compaction tasks. (#5897)
8264bb2a9c is described below

commit 8264bb2a9ce25d9f868790d1189d5eb9a073705a
Author: zhoulii <[email protected]>
AuthorDate: Tue Jul 15 14:25:22 2025 +0800

    [flink] fix that AppendBypassCoordinateOperator does not generate 
compaction tasks. (#5897)
---
 .../flink/source/AppendBypassCoordinateOperator.java  | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
index 89bba8d87f..3fd3e305de 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
@@ -23,6 +23,7 @@ import org.apache.paimon.append.AppendCompactTask;
 import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.ExecutorUtils;
+import org.apache.paimon.utils.Preconditions;
 
 import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -56,6 +57,7 @@ public class AppendBypassCoordinateOperator<CommitT>
 
     private transient ScheduledExecutorService executorService;
     private transient LinkedBlockingQueue<AppendCompactTask> compactTasks;
+    private Throwable throwable;
 
     public AppendBypassCoordinateOperator(
             StreamOperatorParameters<Either<CommitT, AppendCompactTask>> 
parameters,
@@ -85,9 +87,15 @@ public class AppendBypassCoordinateOperator<CommitT>
 
     private void asyncPlan(AppendCompactCoordinator coordinator) {
         while (compactTasks.size() < MAX_PENDING_TASKS) {
-            List<AppendCompactTask> tasks = coordinator.run();
-            compactTasks.addAll(tasks);
-            if (tasks.isEmpty()) {
+            try {
+                List<AppendCompactTask> tasks = coordinator.run();
+                compactTasks.addAll(tasks);
+                if (tasks.isEmpty()) {
+                    break;
+                }
+            } catch (Throwable t) {
+                LOG.error("Fatal exception happened when generating compaction 
tasks.", t);
+                this.throwable = t;
                 break;
             }
         }
@@ -106,6 +114,11 @@ public class AppendBypassCoordinateOperator<CommitT>
 
     @Override
     public void processElement(StreamRecord<CommitT> record) throws Exception {
+        Preconditions.checkState(
+                throwable == null,
+                String.format(
+                        "Fatal exception happened when generating compaction 
tasks: %s",
+                        throwable));
         output.collect(new StreamRecord<>(Either.Left(record.getValue())));
     }
 

Reply via email to