This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch 2.x-connect-tmp-2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit fcc0ba08c105d54d8f5c5a1dc507804a53515f88 Author: lvyanquan <[email protected]> AuthorDate: Wed Mar 18 14:00:15 2026 +0800 Address comments. --- .github/workflows/modules.py | 14 -------- .../flink/translator/DataSinkTranslator.java | 37 ---------------------- .../src/test/resources/log4j2-test.properties | 2 +- .../flink-cdc-pipeline-connector-paimon/pom.xml | 2 +- .../cdc/connectors/postgres/PostgresTestBase.java | 10 ++++++ .../src/test/resources/ddl/replica_identity.sql | 5 ++- flink-cdc-connect/pom.xml | 4 ++- .../flink-cdc-pipeline-e2e-tests/pom.xml | 11 +------ pom.xml | 2 ++ 9 files changed, 20 insertions(+), 67 deletions(-) diff --git a/.github/workflows/modules.py b/.github/workflows/modules.py index 872e08526..86e94164f 100755 --- a/.github/workflows/modules.py +++ b/.github/workflows/modules.py @@ -43,19 +43,6 @@ MODULES_PIPELINE_CONNECTORS = [ "flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values" ] -MODULES_PIPELINE_CONNECTORS_2_X = [ - "flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql", - "flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle", - "flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss", - "flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values", - "flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres", - "flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg", - "flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute", - "flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris", - "flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks", - "flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch" -] - MODULES_MYSQL_SOURCE = [ "flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc", "flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-mysql-cdc" @@ -165,7 +152,6 @@ ALL_MODULES = set( MODULES_CORE + MODULES_CORE_2_X + MODULES_PIPELINE_CONNECTORS + - MODULES_PIPELINE_CONNECTORS_2_X + MODULES_MYSQL_SOURCE + MODULES_MYSQL_PIPELINE + MODULES_POSTGRES_SOURCE + diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java index d59ad1227..14eb95134 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java @@ -36,7 +36,6 @@ import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils; import org.apache.flink.cdc.runtime.operators.sink.BatchDataSinkFunctionOperator; import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator; import org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory; -import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; @@ -48,14 +47,11 @@ import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; import org.apache.flink.streaming.api.transformations.PhysicalTransformation; import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory; -import java.lang.reflect.InvocationTargetException; - /** Translator used to build {@link DataSink} for given {@link DataStream}. */ @Internal public class DataSinkTranslator { @@ -224,37 +220,4 @@ public class DataSinkTranslator { return sinkDef.getName() .orElse(String.format("Flink CDC Event Sink: %s", sinkDef.getType())); } - - private static <CommT> SimpleVersionedSerializer<CommT> getCommittableSerializer(Object sink) { - // FIX ME: TwoPhaseCommittingSink has been deprecated, and its signature has changed - // during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer supported. - try { - return (SimpleVersionedSerializer<CommT>) - sink.getClass().getMethod("getCommittableSerializer").invoke(sink); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException("Failed to get CommittableSerializer", e); - } - } - - private static <CommT> - OneInputStreamOperatorFactory<CommittableMessage<CommT>, CommittableMessage<CommT>> - getCommitterOperatorFactory( - Sink<Event> sink, boolean isBatchMode, boolean isCheckpointingEnabled) { - // FIX ME: OneInputStreamOperatorFactory is an @Internal class, and its signature has - // changed during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer supported. - try { - return (OneInputStreamOperatorFactory< - CommittableMessage<CommT>, CommittableMessage<CommT>>) - Class.forName( - "org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory") - .getDeclaredConstructors()[0] - .newInstance(sink, isBatchMode, isCheckpointingEnabled); - - } catch (ClassNotFoundException - | InstantiationException - | IllegalAccessException - | InvocationTargetException e) { - throw new RuntimeException("Failed to create CommitterOperatorFactory", e); - } - } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/resources/log4j2-test.properties b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/resources/log4j2-test.properties index 318095364..32df1c025 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/resources/log4j2-test.properties +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/resources/log4j2-test.properties @@ -15,7 +15,7 @@ # Set root logger level to ERROR to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level=INFO +rootLogger.level=ERROR rootLogger.appenderRef.test.ref = TestLogger appender.testlogger.name = TestLogger diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml index 274129b0c..5e15835a8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml @@ -39,7 +39,7 @@ limitations under the License. <!-- paimon dependency --> <dependency> <groupId>org.apache.paimon</groupId> - <artifactId>paimon-flink-${flink.major.version}</artifactId> + <artifactId>paimon-flink-${paimon.flink.major.version}</artifactId> <version>${paimon.version}</version> </dependency> diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java index 6d9205386..dafd9fcc4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java @@ -171,7 +171,17 @@ public abstract class PostgresTestBase extends AbstractTestBase { } protected void waitForSnapshotStarted(String sinkName) throws InterruptedException { + waitForSnapshotStarted(sinkName, 120000L); // default 2 minutes timeout + } + + protected void waitForSnapshotStarted(String sinkName, long timeoutMillis) + throws InterruptedException { + long start = System.currentTimeMillis(); while (sinkSize(sinkName) == 0) { + if (System.currentTimeMillis() - start > timeoutMillis) { + throw new AssertionError( + "Timeout waiting for snapshot to start. Sink: " + sinkName); + } sleep(300); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/replica_identity.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/replica_identity.sql index 0df973a76..b6cd6be41 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/replica_identity.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/replica_identity.sql @@ -27,9 +27,8 @@ CREATE TABLE products ( ); ALTER SEQUENCE products_id_seq RESTART WITH 101; --- Enable REPLICA IDENTITY FULL to capture before values for UPDATE/DELETE operations --- This is required for upsert changelog mode to work correctly -ALTER TABLE products REPLICA IDENTITY FULL; +-- USE default REPLICA IDENTITY +-- ALTER TABLE products REPLICA IDENTITY FULL; INSERT INTO products VALUES (default,'scooter','Small 2-wheel scooter',3.14), diff --git a/flink-cdc-connect/pom.xml b/flink-cdc-connect/pom.xml index 44a51d1c8..4cb3bb518 100644 --- a/flink-cdc-connect/pom.xml +++ b/flink-cdc-connect/pom.xml @@ -36,6 +36,9 @@ limitations under the License. <profiles> <profile> <id>flink1</id> + <properties> + <paimon.flink.major.version>1.20</paimon.flink.major.version> + </properties> <activation> <activeByDefault>true</activeByDefault> </activation> @@ -53,7 +56,6 @@ limitations under the License. <id>flink2</id> <properties> <flink.version>${flink.2.x.version}</flink.version> - <flink.major.version>2.1</flink.major.version> </properties> <dependencies> <dependency> diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml index d8d4ebcac..176fd8d4b 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml @@ -773,7 +773,7 @@ limitations under the License. </artifactItem> <artifactItem> <groupId>org.apache.paimon</groupId> - <artifactId>paimon-flink-${flink.major.version}</artifactId> + <artifactId>paimon-flink-2.1</artifactId> <version>${paimon.version}</version> <destFileName>paimon-sql-connector-2.2.0.jar</destFileName> <type>jar</type> @@ -823,13 +823,4 @@ limitations under the License. </plugin> </plugins> </build> - - <profiles> - <profile> - <id>flink2</id> - <properties> - <flink.major.version>2.1</flink.major.version> - </properties> - </profile> - </profiles> </project> diff --git a/pom.xml b/pom.xml index d58218c94..4b8b737ba 100644 --- a/pom.xml +++ b/pom.xml @@ -114,6 +114,7 @@ limitations under the License. <maven.compiler.source>${source.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <flink.kafka.connector.version>3.3.0-1.20</flink.kafka.connector.version> + <paimon.flink.major.version>1.20</paimon.flink.major.version> </properties> <dependencyManagement> @@ -853,6 +854,7 @@ limitations under the License. <id>flink2</id> <properties> <flink.kafka.connector.version>4.0.1-2.0</flink.kafka.connector.version> + <paimon.flink.major.version>2.1</paimon.flink.major.version> </properties> <build> <pluginManagement>
