This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch FLINK-38729-2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit bbdba2707d9593a8527289331f4d821b95ddbf57
Author: lvyanquan <[email protected]>
AuthorDate: Mon Mar 9 18:02:07 2026 +0800

    Fix CI failure.
---
 .../sink/DataSinkWriterOperatorFactory.java        | 47 +++++++++++++++++++---
 1 file changed, 41 insertions(+), 6 deletions(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java
index a286ac2e7..ab64f9c2b 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.cdc.runtime.operators.sink;
 
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.event.Event;
@@ -28,6 +29,8 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 
+import java.lang.reflect.Method;
+
 /** Operator factory for {@link DataSinkWriterOperator}. */
 @Internal
 public class DataSinkWriterOperatorFactory<CommT>
@@ -50,10 +53,10 @@ public class DataSinkWriterOperatorFactory<CommT>
     @Override
     public <T extends StreamOperator<CommittableMessage<CommT>>> T 
createStreamOperator(
             StreamOperatorParameters<CommittableMessage<CommT>> parameters) {
+        MailboxExecutor mailboxExecutor = getMailboxExecutor(parameters);
         if (isBounded) {
             BatchDataSinkWriterOperator<CommT> writerOperator =
-                    new BatchDataSinkWriterOperator<>(
-                            sink, processingTimeService, 
parameters.getMailboxExecutor());
+                    new BatchDataSinkWriterOperator<>(sink, 
processingTimeService, mailboxExecutor);
             writerOperator.setup(
                     parameters.getContainingTask(),
                     parameters.getStreamConfig(),
@@ -62,10 +65,7 @@ public class DataSinkWriterOperatorFactory<CommT>
         }
         DataSinkWriterOperator<CommT> writerOperator =
                 new DataSinkWriterOperator<>(
-                        sink,
-                        processingTimeService,
-                        parameters.getMailboxExecutor(),
-                        schemaOperatorID);
+                        sink, processingTimeService, mailboxExecutor, 
schemaOperatorID);
         writerOperator.setup(
                 parameters.getContainingTask(),
                 parameters.getStreamConfig(),
@@ -73,6 +73,41 @@ public class DataSinkWriterOperatorFactory<CommT>
         return (T) writerOperator;
     }
 
+    /**
+     * Obtains the {@link MailboxExecutor} in a way compatible with both Flink 
1.19 and 1.20+.
+     *
+     * <p>Flink 1.20+ added {@code 
StreamOperatorParameters.getMailboxExecutor()}. In Flink 1.19 and
+     * earlier, the executor must be obtained via {@code
+     * StreamTask.getMailboxExecutorFactory().createExecutor(chainIndex)}.
+     */
+    private MailboxExecutor getMailboxExecutor(
+            StreamOperatorParameters<CommittableMessage<CommT>> parameters) {
+        // Try Flink 1.20+ / 2.x API: 
StreamOperatorParameters.getMailboxExecutor()
+        try {
+            Method m = parameters.getClass().getMethod("getMailboxExecutor");
+            return (MailboxExecutor) m.invoke(parameters);
+        } catch (NoSuchMethodException ignored) {
+            // Fall through to Flink 1.19 path
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to invoke getMailboxExecutor on 
StreamOperatorParameters", e);
+        }
+
+        // Flink 1.19 and earlier: obtain from 
containingTask.getMailboxExecutorFactory()
+        try {
+            Object containingTask = parameters.getContainingTask();
+            Method getFactory = 
containingTask.getClass().getMethod("getMailboxExecutorFactory");
+            Object factory = getFactory.invoke(containingTask);
+            int chainIndex = parameters.getStreamConfig().getChainIndex();
+            Method createExecutor = 
factory.getClass().getMethod("createExecutor", int.class);
+            return (MailboxExecutor) createExecutor.invoke(factory, 
chainIndex);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to obtain MailboxExecutor from StreamTask for 
Flink 1.19 compatibility",
+                    e);
+        }
+    }
+
     @Override
     public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader 
classLoader) {
         if (isBounded) {

Reply via email to