This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new bcb4a4b61 [FLINK-39257] Remove compatibility code for older versions
of Flink (#4317)
bcb4a4b61 is described below
commit bcb4a4b61404c6ecfb4e48e343b4968c44e7fc52
Author: Pei Yu <[email protected]>
AuthorDate: Tue Mar 17 09:08:20 2026 +0800
[FLINK-39257] Remove compatibility code for older versions of Flink (#4317)
Signed-off-by: Pei Yu <[email protected]>
---
.../flink/translator/DataSinkTranslator.java | 46 +++-------------------
1 file changed, 6 insertions(+), 40 deletions(-)
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
index aa530300b..117b08aed 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
@@ -36,7 +36,6 @@ import
org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import
org.apache.flink.cdc.runtime.operators.sink.BatchDataSinkFunctionOperator;
import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator;
import
org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
@@ -46,12 +45,10 @@ import
org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
-
-import java.lang.reflect.InvocationTargetException;
+import
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
/** Translator used to build {@link DataSink} for given {@link DataStream}. */
@Internal
@@ -178,8 +175,10 @@ public class DataSinkTranslator {
boolean isBatchMode,
OperatorID schemaOperatorID,
OperatorUidGenerator operatorUidGenerator) {
+ TwoPhaseCommittingSink<Event, CommT> committingSink =
+ (TwoPhaseCommittingSink<Event, CommT>) sink;
TypeInformation<CommittableMessage<CommT>> typeInformation =
- CommittableMessageTypeInfo.of(() ->
getCommittableSerializer(sink));
+
CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer);
DataStream<CommittableMessage<CommT>> written =
inputStream
.transform(
@@ -202,8 +201,8 @@ public class DataSinkTranslator {
.transform(
SINK_COMMITTER_PREFIX + sinkName,
typeInformation,
- getCommitterOperatorFactory(
- sink, isBatchMode,
isCheckpointingEnabled))
+ new CommitterOperatorFactory<>(
+ committingSink, isBatchMode,
isCheckpointingEnabled))
.uid(operatorUidGenerator.generateUid("sink-committer"));
if (sink instanceof WithPostCommitTopology) {
@@ -215,37 +214,4 @@ public class DataSinkTranslator {
return sinkDef.getName()
.orElse(String.format("Flink CDC Event Sink: %s",
sinkDef.getType()));
}
-
- private static <CommT> SimpleVersionedSerializer<CommT>
getCommittableSerializer(Object sink) {
- // FIX ME: TwoPhaseCommittingSink has been deprecated, and its
signature has changed
- // during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer
supported.
- try {
- return (SimpleVersionedSerializer<CommT>)
-
sink.getClass().getDeclaredMethod("getCommittableSerializer").invoke(sink);
- } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
- throw new RuntimeException("Failed to get CommittableSerializer",
e);
- }
- }
-
- private static <CommT>
- OneInputStreamOperatorFactory<CommittableMessage<CommT>,
CommittableMessage<CommT>>
- getCommitterOperatorFactory(
- Sink<Event> sink, boolean isBatchMode, boolean
isCheckpointingEnabled) {
- // FIX ME: OneInputStreamOperatorFactory is an @Internal class, and
its signature has
- // changed during Flink 1.18 to 1.19. Remove this when Flink 1.18 is
no longer supported.
- try {
- return (OneInputStreamOperatorFactory<
- CommittableMessage<CommT>,
CommittableMessage<CommT>>)
- Class.forName(
-
"org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory")
- .getDeclaredConstructors()[0]
- .newInstance(sink, isBatchMode,
isCheckpointingEnabled);
-
- } catch (ClassNotFoundException
- | InstantiationException
- | IllegalAccessException
- | InvocationTargetException e) {
- throw new RuntimeException("Failed to create
CommitterOperatorFactory", e);
- }
- }
}