This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch FLINK-38729-2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit bbdba2707d9593a8527289331f4d821b95ddbf57 Author: lvyanquan <[email protected]> AuthorDate: Mon Mar 9 18:02:07 2026 +0800 Fix CI failure. --- .../sink/DataSinkWriterOperatorFactory.java | 47 +++++++++++++++++++--- 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java index a286ac2e7..ab64f9c2b 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.runtime.operators.sink; +import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.Event; @@ -28,6 +29,8 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import java.lang.reflect.Method; + /** Operator factory for {@link DataSinkWriterOperator}. */ @Internal public class DataSinkWriterOperatorFactory<CommT> @@ -50,10 +53,10 @@ public class DataSinkWriterOperatorFactory<CommT> @Override public <T extends StreamOperator<CommittableMessage<CommT>>> T createStreamOperator( StreamOperatorParameters<CommittableMessage<CommT>> parameters) { + MailboxExecutor mailboxExecutor = getMailboxExecutor(parameters); if (isBounded) { BatchDataSinkWriterOperator<CommT> writerOperator = - new BatchDataSinkWriterOperator<>( - sink, processingTimeService, parameters.getMailboxExecutor()); + new BatchDataSinkWriterOperator<>(sink, processingTimeService, mailboxExecutor); writerOperator.setup( parameters.getContainingTask(), parameters.getStreamConfig(), @@ -62,10 +65,7 @@ public class DataSinkWriterOperatorFactory<CommT> } DataSinkWriterOperator<CommT> writerOperator = new DataSinkWriterOperator<>( - sink, - processingTimeService, - parameters.getMailboxExecutor(), - schemaOperatorID); + sink, processingTimeService, mailboxExecutor, schemaOperatorID); writerOperator.setup( parameters.getContainingTask(), parameters.getStreamConfig(), @@ -73,6 +73,41 @@ public class DataSinkWriterOperatorFactory<CommT> return (T) writerOperator; } + /** + * Obtains the {@link MailboxExecutor} in a way compatible with both Flink 1.19 and 1.20+. + * + * <p>Flink 1.20+ added {@code StreamOperatorParameters.getMailboxExecutor()}. In Flink 1.19 and + * earlier, the executor must be obtained via {@code + * StreamTask.getMailboxExecutorFactory().createExecutor(chainIndex)}. + */ + private MailboxExecutor getMailboxExecutor( + StreamOperatorParameters<CommittableMessage<CommT>> parameters) { + // Try Flink 1.20+ / 2.x API: StreamOperatorParameters.getMailboxExecutor() + try { + Method m = parameters.getClass().getMethod("getMailboxExecutor"); + return (MailboxExecutor) m.invoke(parameters); + } catch (NoSuchMethodException ignored) { + // Fall through to Flink 1.19 path + } catch (Exception e) { + throw new RuntimeException( + "Failed to invoke getMailboxExecutor on StreamOperatorParameters", e); + } + + // Flink 1.19 and earlier: obtain from containingTask.getMailboxExecutorFactory() + try { + Object containingTask = parameters.getContainingTask(); + Method getFactory = containingTask.getClass().getMethod("getMailboxExecutorFactory"); + Object factory = getFactory.invoke(containingTask); + int chainIndex = parameters.getStreamConfig().getChainIndex(); + Method createExecutor = factory.getClass().getMethod("createExecutor", int.class); + return (MailboxExecutor) createExecutor.invoke(factory, chainIndex); + } catch (Exception e) { + throw new RuntimeException( + "Failed to obtain MailboxExecutor from StreamTask for Flink 1.19 compatibility", + e); + } + } + @Override public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) { if (isBounded) {
