This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch FLINK-38729-2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/FLINK-38729-2 by this push:
     new 41172a30f Remove flink-cdc-pipeline-e2e-tests-2.x module.
41172a30f is described below

commit 41172a30f6867caff334f4ea2f2d4b200a79b193
Author: lvyanquan <[email protected]>
AuthorDate: Wed Mar 11 16:51:57 2026 +0800

    Remove flink-cdc-pipeline-e2e-tests-2.x module.
---
 .github/workflows/flink_cdc_ci.yml                 |   2 +-
 .github/workflows/modules.py                       |  11 +-
 .../flink-cdc-pipeline-e2e-tests-2.x/pom.xml       | 454 ------------------
 .../tests/utils/PipelineTestEnvironment.java       | 505 ---------------------
 .../cdc/pipeline/tests/utils/TarballFetcher.java   | 151 ------
 .../src/test/resources/docker/mysql/my.cnf         |  64 ---
 .../src/test/resources/docker/mysql/setup.sql      |  30 --
 .../src/test/resources/log4j2-test.properties      |  25 -
 .../flink-cdc-pipeline-e2e-tests/pom.xml           |  39 ++
 .../flink/cdc/pipeline/tests/ValuesE2eITCase.java  |  14 +-
 .../cdc/pipeline/tests/utils/TarballFetcher.java   |  30 +-
 flink-cdc-e2e-tests/pom.xml                        |   1 -
 .../sink/BatchDataSinkWriterOperator.java          |  23 +-
 .../operators/sink/DataSinkWriterOperator.java     |  23 +-
 14 files changed, 104 insertions(+), 1268 deletions(-)

diff --git a/.github/workflows/flink_cdc_ci.yml 
b/.github/workflows/flink_cdc_ci.yml
index 66d2f71be..f7514f7e0 100644
--- a/.github/workflows/flink_cdc_ci.yml
+++ b/.github/workflows/flink_cdc_ci.yml
@@ -108,7 +108,7 @@ jobs:
       java-versions: "[11]"
       flink-versions: "['2.2.0']"
       custom-maven-parameter: "-Pflink2"
-      modules: "['pipeline_e2e_2.x']"
+      modules: "['pipeline_e2e']"
       parallelism: ${{ matrix.parallelism }}
   source_e2e:
     name: Source E2E Tests
diff --git a/.github/workflows/modules.py b/.github/workflows/modules.py
index d06c36d02..86e94164f 100755
--- a/.github/workflows/modules.py
+++ b/.github/workflows/modules.py
@@ -144,10 +144,6 @@ MODULES_PIPELINE_E2E = [
     "flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests"
 ]
 
-MODULES_PIPELINE_E2E_2_X = [
-    "flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x"
-]
-
 MODULES_SOURCE_E2E = [
     "flink-cdc-e2e-tests/flink-cdc-source-e2e-tests"
 ]
@@ -182,18 +178,13 @@ ALL_MODULES = set(
     MODULES_SOURCE_E2E
 )
 
-# Modules that require the flink2 Maven profile to be activated
-ALL_MODULES_FLINK2 = ALL_MODULES | set(MODULES_PIPELINE_E2E_2_X)
-
 test_modules = set()
 compile_modules = set()
 
 for module in INPUT_MODULES.split(', '):
     module_list = set(globals()['MODULES_' + module.upper().replace('-', 
'_').replace('.', '_')])
     test_modules |= module_list
-    if module == 'pipeline_e2e_2.x':
-        compile_modules |= ALL_MODULES_FLINK2
-    elif module == 'source_e2e' or module == 'pipeline_e2e':
+    if module == 'source_e2e' or module == 'pipeline_e2e':
         compile_modules |= ALL_MODULES
     else:
         compile_modules |= module_list
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/pom.xml 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/pom.xml
deleted file mode 100644
index 2d2a11259..000000000
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/pom.xml
+++ /dev/null
@@ -1,454 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <artifactId>flink-cdc-e2e-tests</artifactId>
-        <groupId>org.apache.flink</groupId>
-        <version>${revision}</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>flink-cdc-pipeline-e2e-tests-2.x</artifactId>
-
-    <properties>
-        <flink.version>${flink.2.x.version}</flink.version>
-        <mysql.driver.version>8.0.27</mysql.driver.version>
-        <flink.release.download.skip>true</flink.release.download.skip>
-        <skipITs>true</skipITs>
-        
<flink.release.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.release.name>
-        
<flink.release.mirror>https://dlcdn.apache.org/flink/flink-${flink.version}</flink.release.mirror>
-        <maven.plugin.download.version>1.6.8</maven.plugin.download.version>
-        <jmh.version>1.37</jmh.version>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-cdc-e2e-utils</artifactId>
-            <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- Drivers -->
-        <dependency>
-            <groupId>mysql</groupId>
-            <artifactId>mysql-connector-java</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.google.protobuf</groupId>
-                    <artifactId>protobuf-java</artifactId>
-                </exclusion>
-            </exclusions>
-            <version>${mysql.driver.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- CDC connectors test utils - 2.x versions -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-cdc-dist</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-mysql-cdc</artifactId>
-            <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-cdc-pipeline-connector-values</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-test-util</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-cdc-pipeline-udf-examples</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- testcontainers -->
-        <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>testcontainers</artifactId>
-            <version>${testcontainers.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>junit-jupiter</artifactId>
-            <version>${testcontainers.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>mysql</artifactId>
-            <version>${testcontainers.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- benchmark -->
-        <dependency>
-            <groupId>org.openjdk.jmh</groupId>
-            <artifactId>jmh-core</artifactId>
-            <version>${jmh.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.openjdk.jmh</groupId>
-            <artifactId>jmh-generator-annprocess</artifactId>
-            <version>${jmh.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- test dependencies on TestContainers -->
-        <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>postgresql</artifactId>
-            <version>${testcontainers.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- This is for testing Scala UDF.-->
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-library</artifactId>
-            <version>${scala.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>kafka</artifactId>
-            <version>${testcontainers.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- mini yarn -->
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-api</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <!-- This dependency is no longer shipped with the JDK 
since Java 9.-->
-                    <groupId>jdk.tools</groupId>
-                    <artifactId>jdk.tools</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>ch.qos.reload4j</groupId>
-                    <artifactId>reload4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-reload4j</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>jdk.tools</groupId>
-                    <artifactId>jdk.tools</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-client</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <artifactId>jdk.tools</artifactId>
-                    <groupId>jdk.tools</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>log4j</artifactId>
-                    <groupId>log4j</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>slf4j-log4j12</artifactId>
-                    <groupId>org.slf4j</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-server-tests</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-            <exclusions>
-                <exclusion>
-                    <artifactId>jdk.tools</artifactId>
-                    <groupId>jdk.tools</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>log4j</artifactId>
-                    <groupId>log4j</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>slf4j-log4j12</artifactId>
-                    <groupId>org.slf4j</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-            <exclusions>
-                <exclusion>
-                    <artifactId>jdk.tools</artifactId>
-                    <groupId>jdk.tools</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>log4j</artifactId>
-                    <groupId>log4j</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>slf4j-log4j12</artifactId>
-                    <groupId>org.slf4j</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-yarn</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <testResources>
-            <testResource>
-                <directory>src/test/resources</directory>
-                <excludes>
-                    <exclude>**/flink-cdc.sh</exclude>
-                    <exclude>**/flink-cdc.yaml</exclude>
-                </excludes>
-            </testResource>
-        </testResources>
-        <plugins>
-            <!--
-                Skip tests in the test phase.
-                Tests will run in the integration-test phase after shading is 
complete.
-            -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <configuration>
-                    <skip>true</skip>
-                </configuration>
-            </plugin>
-
-            <!--
-                Configure failsafe to run integration tests AFTER the package 
phase.
-                This ensures tests use the shaded artifacts where Guava and 
Flink APIs
-                have been relocated.
-            -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-failsafe-plugin</artifactId>
-                <version>3.0.0-M5</version>
-                <executions>
-                    <execution>
-                        <id>end-to-end-tests</id>
-                        <goals>
-                            <goal>integration-test</goal>
-                            <goal>verify</goal>
-                        </goals>
-                        <configuration>
-                            <includes>
-                                <include>**/*.*</include>
-                            </includes>
-                            <excludes>
-                                
<exclude>**/MysqlE2eWithYarnApplicationITCase.java</exclude>
-                            </excludes>
-                            <forkCount>1</forkCount>
-                            <systemPropertyVariables>
-                                <moduleDir>${project.basedir}</moduleDir>
-                            </systemPropertyVariables>
-                        </configuration>
-                    </execution>
-                    <execution>
-                        <id>run-last-test</id>
-                        <goals>
-                            <goal>integration-test</goal>
-                            <goal>verify</goal>
-                        </goals>
-                        <configuration>
-                            <includes>
-                                
<include>**/MysqlE2eWithYarnApplicationITCase.java</include>
-                            </includes>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>com.googlecode.maven-download-plugin</groupId>
-                <artifactId>download-maven-plugin</artifactId>
-                <version>1.6.8</version>
-                <configuration>
-                    
<cacheDirectory>${maven.plugin.download.cache.path}</cacheDirectory>
-                    
<outputDirectory>${project.build.directory}</outputDirectory>
-                    <readTimeOut>60000</readTimeOut>
-                    <retries>3</retries>
-                    <unpack>true</unpack>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>download-flink-release</id>
-                        <goals>
-                            <goal>wget</goal>
-                        </goals>
-                        <phase>test</phase>
-                        <configuration>
-                            <skip>${flink.release.download.skip}</skip>
-                            
<url>${flink.release.mirror}/${flink.release.name}</url>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>copy-jars</id>
-                        <phase>test</phase>
-                        <goals>
-                            <goal>copy</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>store-classpath-in-target-for-tests</id>
-                        <phase>process-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            
<outputFile>${project.build.directory}/yarn.classpath</outputFile>
-                            <excludeGroupIds>org.apache.flink</excludeGroupIds>
-                        </configuration>
-                    </execution>
-                </executions>
-                <configuration>
-                    <artifactItems>
-
-                        <artifactItem>
-                            <groupId>mysql</groupId>
-                            <artifactId>mysql-connector-java</artifactId>
-                            <version>${mysql.driver.version}</version>
-                            <destFileName>mysql-driver.jar</destFileName>
-                            <type>jar</type>
-                            
<outputDirectory>${project.build.directory}/dependencies
-                            </outputDirectory>
-                        </artifactItem>
-
-                        <artifactItem>
-                            <groupId>org.apache.flink</groupId>
-                            <artifactId>flink-cdc-dist</artifactId>
-                            <version>${project.version}</version>
-                            <destFileName>flink-cdc-dist.jar</destFileName>
-                            <type>jar</type>
-                            
<outputDirectory>${project.build.directory}/dependencies
-                            </outputDirectory>
-                        </artifactItem>
-
-                        <artifactItem>
-                            <groupId>org.apache.flink</groupId>
-                            
<artifactId>flink-cdc-pipeline-connector-values</artifactId>
-                            <version>${project.version}</version>
-                            
<destFileName>values-cdc-pipeline-connector.jar</destFileName>
-                            <type>jar</type>
-                            
<outputDirectory>${project.build.directory}/dependencies
-                            </outputDirectory>
-                        </artifactItem>
-
-                        <artifactItem>
-                            <groupId>org.apache.flink</groupId>
-                            
<artifactId>flink-cdc-pipeline-udf-examples</artifactId>
-                            <version>${project.version}</version>
-                            <destFileName>udf-examples.jar</destFileName>
-                            <type>jar</type>
-                            
<outputDirectory>${project.build.directory}/dependencies
-                            </outputDirectory>
-                        </artifactItem>
-
-                        <artifactItem>
-                            <groupId>org.scala-lang</groupId>
-                            <artifactId>scala-library</artifactId>
-                            <version>${scala.version}</version>
-                            <destFileName>scala-library.jar</destFileName>
-                            <type>jar</type>
-                            
<outputDirectory>${project.build.directory}/dependencies
-                            </outputDirectory>
-                        </artifactItem>
-                    </artifactItems>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-    <profiles>
-        <profile>
-            <id>flink2</id>
-            <properties>
-                
<flink.release.download.skip>false</flink.release.download.skip>
-                <skipITs>false</skipITs>
-            </properties>
-        </profile>
-    </profiles>
-</project>
\ No newline at end of file
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
deleted file mode 100644
index b1d57c778..000000000
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.pipeline.tests.utils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.cdc.common.test.utils.TestUtils;
-import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
-import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
-import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.util.TestLogger;
-
-import com.github.dockerjava.api.DockerClient;
-import com.github.dockerjava.api.command.ExecCreateCmdResponse;
-import com.github.dockerjava.api.model.Volume;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.DockerClientFactory;
-import org.testcontainers.containers.BindMode;
-import org.testcontainers.containers.Container.ExecResult;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.FrameConsumerResultCallback;
-import org.testcontainers.containers.output.OutputFrame;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.output.ToStringConsumer;
-import org.testcontainers.images.builder.Transferable;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
-
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
-import java.util.stream.Stream;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/** Test environment running pipeline job on Flink containers. */
-@Testcontainers
-public abstract class PipelineTestEnvironment extends TestLogger {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(PipelineTestEnvironment.class);
-
-    protected Integer parallelism = getParallelism();
-
-    private int getParallelism() {
-        try {
-            return 
Integer.parseInt(System.getProperty("specifiedParallelism"));
-        } catch (NumberFormatException ex) {
-            LOG.warn(
-                    "Unable to parse specified parallelism configuration ({} 
provided). Use 4 by default.",
-                    System.getProperty("specifiedParallelism"));
-            return 4;
-        }
-    }
-
-    // 
------------------------------------------------------------------------------------------
-    // MySQL Variables (we always use MySQL as the data source for easier 
verifying)
-    // 
------------------------------------------------------------------------------------------
-    protected static final String MYSQL_TEST_USER = "mysqluser";
-    protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
-    protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
-    protected static final Duration EVENT_WAITING_TIMEOUT = 
Duration.ofMinutes(3);
-    protected static final Duration STARTUP_WAITING_TIMEOUT = 
Duration.ofMinutes(5);
-
-    public static final Network NETWORK = Network.newNetwork();
-
-    @Container
-    protected static final MySqlContainer MYSQL =
-            (MySqlContainer)
-                    new MySqlContainer(MySqlVersion.V8_0)
-                            .withConfigurationOverride("docker/mysql/my.cnf")
-                            .withSetupSQL("docker/mysql/setup.sql")
-                            .withDatabaseName("flink-test")
-                            .withUsername("flinkuser")
-                            .withPassword("flinkpw")
-                            .withNetwork(NETWORK)
-                            .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
-                            .withLogConsumer(new Slf4jLogConsumer(LOG));
-
-    // 
------------------------------------------------------------------------------------------
-    // Flink Variables
-    // 
------------------------------------------------------------------------------------------
-    protected static final int JOB_MANAGER_REST_PORT = 8081;
-    protected static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
-    protected static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
-    protected static final List<String> EXTERNAL_PROPS =
-            Arrays.asList(
-                    String.format("jobmanager.rpc.address: %s", 
INTER_CONTAINER_JM_ALIAS),
-                    "jobmanager.bind-host: 0.0.0.0",
-                    "taskmanager.bind-host: 0.0.0.0",
-                    "rest.bind-address: 0.0.0.0",
-                    "rest.address: 0.0.0.0",
-                    "jobmanager.memory.process.size: 1GB",
-                    "query.server.port: 6125",
-                    "blob.server.port: 6124",
-                    "taskmanager.numberOfTaskSlots: 10",
-                    "parallelism.default: 4",
-                    "execution.checkpointing.interval: 300",
-                    "state.backend.type: hashmap",
-                    "env.java.default-opts.all: 
--add-exports=java.base/sun.net.util=ALL-UNNAMED 
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED 
--add-exports=java.sec [...]
-                    "execution.checkpointing.savepoint-dir: file:///opt/flink",
-                    "restart-strategy.type: off",
-                    "pekko.ask.timeout: 60s",
-                    // Set off-heap memory explicitly to avoid 
"java.lang.OutOfMemoryError: Direct
-                    // buffer memory" error.
-                    "taskmanager.memory.task.off-heap.size: 128mb",
-                    // Fix `java.lang.OutOfMemoryError: Metaspace. The 
metaspace out-of-memory error
-                    // has occurred` error.
-                    "taskmanager.memory.jvm-metaspace.size: 512mb");
-    public static final String FLINK_PROPERTIES = String.join("\n", 
EXTERNAL_PROPS);
-
-    @Nullable protected RestClusterClient<StandaloneClusterId> 
restClusterClient;
-
-    protected GenericContainer<?> jobManager;
-    protected GenericContainer<?> taskManager;
-    protected Volume sharedVolume = new Volume("/tmp/shared");
-
-    protected ToStringConsumer jobManagerConsumer;
-
-    protected ToStringConsumer taskManagerConsumer;
-
-    protected String flinkVersion = getFlinkVersion();
-
-    public static String getFlinkVersion() {
-        return "2.2.0";
-    }
-
-    protected List<String> copyJarToFlinkLib() {
-        return Collections.emptyList();
-    }
-
-    @BeforeEach
-    public void before() throws Exception {
-        LOG.info("Starting containers...");
-        jobManagerConsumer = new ToStringConsumer();
-        jobManager =
-                new GenericContainer<>(getFlinkDockerImageTag())
-                        .withCommand("jobmanager")
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
-                        .withExposedPorts(JOB_MANAGER_REST_PORT)
-                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
-                        .withCreateContainerCmdModifier(cmd -> 
cmd.withVolumes(sharedVolume))
-                        .withLogConsumer(jobManagerConsumer);
-
-        List<String> jarToCopy = copyJarToFlinkLib();
-        if (!jarToCopy.isEmpty()) {
-            for (String jar : jarToCopy) {
-                jobManager.withCopyFileToContainer(
-                        MountableFile.forHostPath(TestUtils.getResource(jar)), 
"/opt/flink/lib/");
-            }
-        }
-
-        Startables.deepStart(Stream.of(jobManager)).join();
-        runInContainerAsRoot(jobManager, "chmod", "0777", "-R", 
sharedVolume.toString());
-        LOG.info("JobManager is started.");
-
-        taskManagerConsumer = new ToStringConsumer();
-        taskManager =
-                new GenericContainer<>(getFlinkDockerImageTag())
-                        .withCommand("taskmanager")
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
-                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
-                        .dependsOn(jobManager)
-                        .withVolumesFrom(jobManager, BindMode.READ_WRITE)
-                        .withLogConsumer(taskManagerConsumer);
-        Startables.deepStart(Stream.of(taskManager)).join();
-        runInContainerAsRoot(taskManager, "chmod", "0777", "-R", 
sharedVolume.toString());
-        LOG.info("TaskManager is started.");
-
-        TarballFetcher.fetchLatest(jobManager);
-        LOG.info("CDC executables deployed.");
-    }
-
-    @AfterEach
-    public void after() {
-        if (restClusterClient != null) {
-            restClusterClient.close();
-        }
-        if (jobManager != null) {
-            jobManager.stop();
-        }
-        if (taskManager != null) {
-            taskManager.stop();
-        }
-    }
-
-    /**
-     * Submits a YAML job to the running cluster with latest CDC version, 
without from previous
-     * savepoint states.
-     */
-    public JobID submitPipelineJob(String pipelineJob, Path... jars) throws 
Exception {
-        return submitPipelineJob(
-                TarballFetcher.CdcVersion.SNAPSHOT, pipelineJob, null, false, 
jars);
-    }
-
-    /**
-     * Submits a YAML job to the running cluster with specific CDC version, 
without from previous
-     * savepoint states.
-     */
-    public JobID submitPipelineJob(
-            TarballFetcher.CdcVersion version, String pipelineJob, Path... 
jars) throws Exception {
-        return submitPipelineJob(version, pipelineJob, null, false, jars);
-    }
-
-    /** Submits a YAML job to the running cluster with latest CDC version. */
-    public JobID submitPipelineJob(
-            String pipelineJob,
-            @Nullable String savepointPath,
-            boolean allowNonRestoredState,
-            Path... jars)
-            throws Exception {
-        return submitPipelineJob(
-                TarballFetcher.CdcVersion.SNAPSHOT,
-                pipelineJob,
-                savepointPath,
-                allowNonRestoredState,
-                jars);
-    }
-
-    public JobID submitPipelineJob(
-            TarballFetcher.CdcVersion version,
-            String pipelineJob,
-            @Nullable String savepointPath,
-            boolean allowNonRestoredState,
-            Path... jars)
-            throws Exception {
-
-        // Prepare external JAR dependencies
-        List<Path> paths = new ArrayList<>(Arrays.asList(jars));
-        List<String> containerPaths = new ArrayList<>();
-        paths.add(TestUtils.getResource("mysql-driver.jar"));
-
-        for (Path jar : paths) {
-            String containerPath = version.workDir() + "/lib/" + 
jar.getFileName();
-            jobManager.copyFileToContainer(MountableFile.forHostPath(jar), 
containerPath);
-            containerPaths.add(containerPath);
-        }
-
-        containerPaths.add(version.workDir() + 
"/lib/values-cdc-pipeline-connector.jar");
-
-        StringBuilder sb = new StringBuilder();
-        for (String containerPath : containerPaths) {
-            sb.append(" --jar ").append(containerPath);
-        }
-
-        jobManager.copyFileToContainer(
-                Transferable.of(pipelineJob), version.workDir() + 
"/conf/pipeline.yaml");
-
-        String commands =
-                version.workDir()
-                        + "/bin/flink-cdc.sh "
-                        + version.workDir()
-                        + "/conf/pipeline.yaml --flink-home /opt/flink"
-                        + sb;
-
-        if (savepointPath != null) {
-            commands += " --from-savepoint " + savepointPath;
-            if (allowNonRestoredState) {
-                commands += " --allow-nonRestored-state";
-            }
-        }
-        LOG.info("Execute command: {}", commands);
-        ExecResult execResult = executeAndCheck(jobManager, commands);
-        return Arrays.stream(execResult.getStdout().split("\n"))
-                .filter(line -> line.startsWith("Job ID: "))
-                .findFirst()
-                .map(line -> line.split(": ")[1])
-                .map(JobID::fromHexString)
-                .orElse(null);
-    }
-
-    public String stopJobWithSavepoint(JobID jobID) {
-        String savepointPath = "/opt/flink/";
-        ExecResult result =
-                executeAndCheck(
-                        jobManager,
-                        "flink",
-                        "stop",
-                        jobID.toHexString(),
-                        "--savepointPath",
-                        savepointPath);
-
-        return Arrays.stream(result.getStdout().split("\n"))
-                .filter(line -> line.startsWith("Savepoint completed."))
-                .findFirst()
-                .map(line -> line.split("Path: file:")[1])
-                .orElseThrow(
-                        () -> new RuntimeException("Failed to parse savepoint 
path from stdout."));
-    }
-
-    public void cancelJob(JobID jobID) {
-        executeAndCheck(jobManager, "flink", "cancel", jobID.toHexString());
-    }
-
-    /**
-     * Get {@link RestClusterClient} connected to this FlinkContainer.
-     *
-     * <p>This method lazily initializes the REST client on-demand.
-     */
-    public RestClusterClient<StandaloneClusterId> getRestClusterClient() {
-        if (restClusterClient != null) {
-            return restClusterClient;
-        }
-        checkState(
-                jobManager.isRunning(),
-                "Cluster client should only be retrieved for a running 
cluster");
-        try {
-            final Configuration clientConfiguration = new Configuration();
-            clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost());
-            clientConfiguration.set(
-                    RestOptions.PORT, 
jobManager.getMappedPort(JOB_MANAGER_REST_PORT));
-            this.restClusterClient =
-                    new RestClusterClient<>(clientConfiguration, 
StandaloneClusterId.getInstance());
-        } catch (Exception e) {
-            throw new IllegalStateException(
-                    "Failed to create client for Flink container cluster", e);
-        }
-        return restClusterClient;
-    }
-
-    public void waitUntilJobRunning(Duration timeout) {
-        waitUntilJobState(timeout, JobStatus.RUNNING);
-    }
-
-    public void waitUntilJobFinished(Duration timeout) {
-        waitUntilJobState(timeout, JobStatus.FINISHED);
-    }
-
-    public void waitUntilJobState(Duration timeout, JobStatus expectedStatus) {
-        RestClusterClient<?> clusterClient = getRestClusterClient();
-        Deadline deadline = Deadline.fromNow(timeout);
-        while (deadline.hasTimeLeft()) {
-            Collection<JobStatusMessage> jobStatusMessages;
-            try {
-                jobStatusMessages = clusterClient.listJobs().get(10, 
TimeUnit.SECONDS);
-            } catch (Exception e) {
-                LOG.warn("Error when fetching job status.", e);
-                continue;
-            }
-            if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) {
-                JobStatusMessage message = jobStatusMessages.iterator().next();
-                JobStatus jobStatus = message.getJobState();
-                if (!expectedStatus.isTerminalState() && 
jobStatus.isTerminalState()) {
-                    throw new ValidationException(
-                            String.format(
-                                    "Job has been terminated! JobName: %s, 
JobID: %s, Status: %s",
-                                    message.getJobName(),
-                                    message.getJobId(),
-                                    message.getJobState()));
-                } else if (jobStatus == expectedStatus) {
-                    return;
-                }
-            }
-        }
-    }
-
-    protected String getFlinkDockerImageTag() {
-        if (System.getProperty("java.specification.version").equals("17")) {
-            return String.format("flink:%s-scala_2.12-java17", flinkVersion);
-        }
-        return String.format("flink:%s-scala_2.12-java11", flinkVersion);
-    }
-
-    private ExecResult executeAndCheck(GenericContainer<?> container, 
String... command) {
-        String joinedCommand = String.join(" ", command);
-        try {
-            LOG.info("Executing command {}", joinedCommand);
-            ExecResult execResult =
-                    container.execInContainer("bash", "-c", String.join(" ", 
command));
-            LOG.info(execResult.getStdout());
-            if (execResult.getExitCode() == 0) {
-                LOG.info("Command executed successfully.");
-                return execResult;
-            } else {
-                LOG.error(execResult.getStderr());
-                throw new AssertionError(
-                        "Failed when submitting the pipeline job.\n"
-                                + "Exit code: "
-                                + execResult.getExitCode()
-                                + "\n"
-                                + "StdOut: "
-                                + execResult.getStdout()
-                                + "\n"
-                                + "StdErr: "
-                                + execResult.getStderr());
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    "Failed to execute command " + joinedCommand + " in 
container " + container);
-        }
-    }
-
-    public void runInContainerAsRoot(GenericContainer<?> container, String... 
command)
-            throws InterruptedException {
-        ToStringConsumer stdoutConsumer = new ToStringConsumer();
-        ToStringConsumer stderrConsumer = new ToStringConsumer();
-        DockerClient dockerClient = DockerClientFactory.instance().client();
-        ExecCreateCmdResponse execCreateCmdResponse =
-                dockerClient
-                        .execCreateCmd(container.getContainerId())
-                        .withUser("root")
-                        .withCmd(command)
-                        .exec();
-        FrameConsumerResultCallback callback = new 
FrameConsumerResultCallback();
-        callback.addConsumer(OutputFrame.OutputType.STDOUT, stdoutConsumer);
-        callback.addConsumer(OutputFrame.OutputType.STDERR, stderrConsumer);
-        
dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(callback).awaitCompletion();
-    }
-
-    protected List<String> readLines(String resource) throws IOException {
-        final URL url = 
PipelineTestEnvironment.class.getClassLoader().getResource(resource);
-        assert url != null;
-        Path path = new File(url.getFile()).toPath();
-        return Files.readAllLines(path);
-    }
-
-    protected void validateResult(String... expectedEvents) throws Exception {
-        validateResult(Function.identity(), expectedEvents);
-    }
-
-    protected void validateResult(Function<String, String> mapper, String... 
expectedEvents)
-            throws Exception {
-        validateResult(
-                taskManagerConsumer, 
Stream.of(expectedEvents).map(mapper).toArray(String[]::new));
-    }
-
-    protected void validateResult(ToStringConsumer consumer, String... 
expectedEvents)
-            throws Exception {
-        for (String event : expectedEvents) {
-            waitUntilSpecificEvent(consumer, event);
-        }
-    }
-
-    protected void validateResult(
-            ToStringConsumer consumer, Function<String, String> mapper, 
String... expectedEvents)
-            throws Exception {
-        validateResult(consumer, 
Stream.of(expectedEvents).map(mapper).toArray(String[]::new));
-    }
-
-    protected void waitUntilSpecificEvent(String event) throws Exception {
-        waitUntilSpecificEvent(taskManagerConsumer, event);
-    }
-
-    protected void waitUntilSpecificEvent(ToStringConsumer consumer, String 
event)
-            throws Exception {
-        boolean result = false;
-        long endTimeout = System.currentTimeMillis() + 
EVENT_WAITING_TIMEOUT.toMillis();
-        while (System.currentTimeMillis() < endTimeout) {
-            String stdout = consumer.toUtf8String();
-            if (stdout.contains(event + "\n")) {
-                result = true;
-                break;
-            }
-            Thread.sleep(1000);
-        }
-        if (!result) {
-            throw new TimeoutException(
-                    "failed to get specific event: "
-                            + event
-                            + " from stdout: "
-                            + consumer.toUtf8String());
-        }
-    }
-}
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
deleted file mode 100644
index 8bd16d805..000000000
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.pipeline.tests.utils;
-
-import org.apache.flink.cdc.common.test.utils.TestUtils;
-
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.utility.MountableFile;
-
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-
-/** Obtain and downloads corresponding Flink CDC tarball files. */
-public abstract class TarballFetcher {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(TarballFetcher.class);
-
-    public static void fetchAll(GenericContainer<?> container) throws 
Exception {
-        fetch(container, CdcVersion.values());
-    }
-
-    public static void fetchLatest(GenericContainer<?> container) throws 
Exception {
-        fetch(container, CdcVersion.SNAPSHOT);
-    }
-
-    public static void fetch(GenericContainer<?> container, CdcVersion... 
versions)
-            throws Exception {
-        for (CdcVersion version : versions) {
-            TarballFetcher.fetchInternal(container, version);
-        }
-    }
-
-    private static void fetchInternal(GenericContainer<?> container, 
CdcVersion version)
-            throws Exception {
-        LOG.info("Trying to download CDC tarball @ {}...", version);
-        if (CdcVersion.SNAPSHOT.equals(version)) {
-            LOG.info("CDC {} is a snapshot version, we should fetch it 
locally...", version);
-
-            container.copyFileToContainer(
-                    MountableFile.forHostPath(
-                            TestUtils.getResource("flink-cdc.sh", 
"flink-cdc-dist", "src"), 0755),
-                    version.workDir() + "/bin/flink-cdc.sh");
-            container.copyFileToContainer(
-                    MountableFile.forHostPath(
-                            TestUtils.getResource("flink-cdc.yaml", 
"flink-cdc-dist", "src"), 0755),
-                    version.workDir() + "/conf/flink-cdc.yaml");
-            container.copyFileToContainer(
-                    
MountableFile.forHostPath(TestUtils.getResource("flink-cdc-dist.jar")),
-                    version.workDir() + "/lib/flink-cdc-dist.jar");
-            container.copyFileToContainer(
-                    MountableFile.forHostPath(
-                            
TestUtils.getResource("values-cdc-pipeline-connector.jar")),
-                    version.workDir() + 
"/lib/values-cdc-pipeline-connector.jar");
-
-        } else {
-            LOG.info("CDC {} is a released version, download it from the 
Internet...", version);
-
-            String containerPath = "/tmp/tarball/" + version.getVersion() + 
".tar.gz";
-            downloadAndCopyToContainer(container, version.tarballUrl(), 
containerPath);
-            container.execInContainer("mkdir", "-p", version.workDir());
-            container.execInContainer(
-                    "tar", "-xzvf", containerPath, "-C", version.workDir(), 
"--strip-components=1");
-
-            downloadAndCopyToContainer(
-                    container,
-                    version.connectorJarUrl("values"),
-                    version.workDir() + 
"/lib/values-cdc-pipeline-connector.jar");
-        }
-    }
-
-    private static void downloadAndCopyToContainer(
-            GenericContainer<?> container, String url, String containerPath) 
throws Exception {
-        Path tempFile = Files.createTempFile("download-", ".tmp");
-        FileUtils.copyURLToFile(
-                new URL(url),
-                tempFile.toFile(),
-                (int) Duration.ofMinutes(1).toMillis(),
-                (int) Duration.ofMinutes(5).toMillis());
-        container.copyFileToContainer(MountableFile.forHostPath(tempFile), 
containerPath);
-    }
-
-    /** Enum for all released Flink CDC version tags. */
-    public enum CdcVersion {
-        V3_1_1("3.1.1"),
-        V3_2_0("3.2.0"),
-        V3_2_1("3.2.1"),
-        V3_3_0("3.3.0"),
-        V3_4_0("3.4.0"),
-        V3_5_0("3.5.0"),
-        SNAPSHOT("SNAPSHOT");
-
-        private final String version;
-
-        CdcVersion(String version) {
-            this.version = version;
-        }
-
-        public String getVersion() {
-            return version;
-        }
-
-        public static List<CdcVersion> getAllVersions() {
-            return Arrays.asList(CdcVersion.values());
-        }
-
-        public static List<CdcVersion> getVersionsSince(CdcVersion version) {
-            return getAllVersions()
-                    .subList(getAllVersions().indexOf(version), 
getAllVersions().size());
-        }
-
-        public String tarballUrl() {
-            return "https://dlcdn.apache.org/flink/flink-cdc-";
-                    + version
-                    + "/flink-cdc-"
-                    + version
-                    + "-bin.tar.gz";
-        }
-
-        public String workDir() {
-            return "/tmp/cdc/" + version;
-        }
-
-        public String connectorJarUrl(String name) {
-            return String.format(
-                    
"https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-%s/%s/flink-cdc-pipeline-connector-%s-%s.jar";,
-                    name, version, name, version);
-        }
-    }
-}
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/my.cnf
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/my.cnf
deleted file mode 100644
index 26d1abad6..000000000
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/my.cnf
+++ /dev/null
@@ -1,64 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# For advice on how to change settings please see
-# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
-
-[mysqld]
-#
-# Remove leading # and set to the amount of RAM for the most important data
-# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
-# innodb_buffer_pool_size = 128M
-#
-# Remove leading # to turn on a very important data integrity option: logging
-# changes to the binary log between backups.
-# log_bin
-#
-# Remove leading # to set options mainly useful for reporting servers.
-# The server defaults are faster for transactions and fast SELECTs.
-# Adjust sizes as needed, experiment to find the optimal values.
-# join_buffer_size = 128M
-# sort_buffer_size = 2M
-# read_rnd_buffer_size = 2M
-skip-host-cache
-skip-name-resolve
-#datadir=/var/lib/mysql
-#socket=/var/lib/mysql/mysql.sock
-#secure-file-priv=/var/lib/mysql-files
-secure-file-priv=/var/lib/mysql
-user=mysql
-
-# Disabling symbolic-links is recommended to prevent assorted security risks
-symbolic-links=0
-
-#log-error=/var/log/mysqld.log
-#pid-file=/var/run/mysqld/mysqld.pid
-
-# ----------------------------------------------
-# Enable the binlog for replication & CDC
-# ----------------------------------------------
-
-# Enable binary replication log and set the prefix, expiration, and log format.
-# The prefix is arbitrary, expiration can be short for integration tests but 
would
-# be longer on a production system. Row-level info is required for ingest to 
work.
-# Server ID is required, but this will vary on production systems
-server-id         = 223344
-log_bin           = mysql-bin
-expire_logs_days  = 1
-binlog_format     = row
-
-# enable gtid mode
-gtid_mode = on
-enforce_gtid_consistency = on
\ No newline at end of file
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/setup.sql
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/setup.sql
deleted file mode 100644
index 3c0cccb4b..000000000
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/setup.sql
+++ /dev/null
@@ -1,30 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one or more
--- contributor license agreements.  See the NOTICE file distributed with
--- this work for additional information regarding copyright ownership.
--- The ASF licenses this file to You under the Apache License, Version 2.0
--- (the "License"); you may not use this file except in compliance with
--- the License.  You may obtain a copy of the License at
---
---     http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
--- In production you would almost certainly limit the replication user must be 
on the follower (slave) machine,
--- to prevent other clients accessing the log from other machines. For 
example, 'replicator'@'follower.acme.com'.
--- However, in this database we'll grant 2 users different privileges:
---
--- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog 
reader (used for testing)
--- 2) 'mysqluser' - all privileges
---
-GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, 
LOCK TABLES  ON *.* TO 'flinkuser'@'%';
-CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
-GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
-
--- 
----------------------------------------------------------------------------------------------------------------
--- DATABASE:  emptydb
--- 
----------------------------------------------------------------------------------------------------------------
-CREATE DATABASE emptydb;
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/log4j2-test.properties
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/log4j2-test.properties
deleted file mode 100644
index 32df1c025..000000000
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/log4j2-test.properties
+++ /dev/null
@@ -1,25 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#      http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to ERROR to not flood build logs
-# set manually to INFO for debugging purposes
-rootLogger.level=ERROR
-rootLogger.appenderRef.test.ref = TestLogger
-
-appender.testlogger.name = TestLogger
-appender.testlogger.type = CONSOLE
-appender.testlogger.target = SYSTEM_ERR
-appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 8c263f8de..897d1acc3 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -805,4 +805,43 @@ limitations under the License.
             </plugin>
         </plugins>
     </build>
+
+    <profiles>
+        <profile>
+            <id>flink2</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-surefire-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>default-test</id>
+                                <phase>none</phase>
+                            </execution>
+                            <execution>
+                                <id>integration-tests</id>
+                                <phase>none</phase>
+                            </execution>
+                            <execution>
+                                <id>end-to-end-tests</id>
+                                <configuration>
+                                    <includes>
+                                        
<include>**/ValuesE2eITCase.java</include>
+                                    </includes>
+                                    <excludes>
+                                        
<exclude>**/MysqlE2eWithYarnApplicationITCase.java</exclude>
+                                    </excludes>
+                                </configuration>
+                            </execution>
+                            <execution>
+                                <id>run-last-test</id>
+                                <phase>none</phase>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java
similarity index 97%
rename from 
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java
rename to 
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java
index 7d863333d..0f7bc28cb 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java
@@ -47,7 +47,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment {
 
         submitPipelineJob(pipelineJob);
         waitUntilJobFinished(Duration.ofSeconds(30));
-        LOG.info("Pipeline job is running");
+        LOG.info("Pipeline job has finished");
 
         validateResult(
                 
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
@@ -79,7 +79,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment {
 
         submitPipelineJob(pipelineJob);
         waitUntilJobFinished(Duration.ofSeconds(30));
-        LOG.info("Pipeline job is running");
+        LOG.info("Pipeline job has finished");
 
         validateResult(
                 
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
@@ -115,7 +115,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment {
 
         submitPipelineJob(pipelineJob);
         waitUntilJobFinished(Duration.ofSeconds(30));
-        LOG.info("Pipeline job is running");
+        LOG.info("Pipeline job has finished");
 
         validateResult(
                 
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
@@ -151,7 +151,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment {
 
         submitPipelineJob(pipelineJob);
         waitUntilJobFinished(Duration.ofSeconds(30));
-        LOG.info("Pipeline job is running");
+        LOG.info("Pipeline job has finished");
 
         validateResult(
                 
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
@@ -186,7 +186,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment {
 
         submitPipelineJob(pipelineJob);
         waitUntilJobFinished(Duration.ofSeconds(30));
-        LOG.info("Pipeline job is running");
+        LOG.info("Pipeline job has finished");
 
         validateResult(
                 
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
@@ -214,7 +214,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment {
 
         submitPipelineJob(pipelineJob);
         waitUntilJobFinished(Duration.ofSeconds(30));
-        LOG.info("Pipeline job is running");
+        LOG.info("Pipeline job has finished");
 
         validateResult(
                 
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
@@ -246,7 +246,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment {
 
         submitPipelineJob(pipelineJob);
         waitUntilJobFinished(Duration.ofSeconds(30));
-        LOG.info("Pipeline job is running");
+        LOG.info("Pipeline job has finished");
 
         validateResult(
                 
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
index 1d5788918..450f683d6 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
@@ -19,10 +19,16 @@ package org.apache.flink.cdc.pipeline.tests.utils;
 
 import org.apache.flink.cdc.common.test.utils.TestUtils;
 
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.command.ExecCreateCmdResponse;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
 import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.FrameConsumerResultCallback;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.ToStringConsumer;
 import org.testcontainers.utility.MountableFile;
 
 import java.net.URL;
@@ -60,11 +66,11 @@ public abstract class TarballFetcher {
 
             container.copyFileToContainer(
                     MountableFile.forHostPath(
-                            TestUtils.getResource("flink-cdc.sh", 
"flink-cdc-dist", "src"), 755),
+                            TestUtils.getResource("flink-cdc.sh", 
"flink-cdc-dist", "src")),
                     version.workDir() + "/bin/flink-cdc.sh");
             container.copyFileToContainer(
                     MountableFile.forHostPath(
-                            TestUtils.getResource("flink-cdc.yaml", 
"flink-cdc-dist", "src"), 755),
+                            TestUtils.getResource("flink-cdc.yaml", 
"flink-cdc-dist", "src")),
                     version.workDir() + "/conf/flink-cdc.yaml");
             container.copyFileToContainer(
                     
MountableFile.forHostPath(TestUtils.getResource("flink-cdc-dist.jar")),
@@ -78,6 +84,9 @@ public abstract class TarballFetcher {
                             
TestUtils.getResource("values-cdc-pipeline-connector.jar")),
                     version.workDir() + 
"/lib/values-cdc-pipeline-connector.jar");
 
+            // Ensure the script has execute permission
+            runInContainerAsRoot(container, "chmod", "+x", version.workDir() + 
"/bin/flink-cdc.sh");
+
         } else {
             LOG.info("CDC {} is a released version, download it from the 
Internet...", version);
 
@@ -109,6 +118,23 @@ public abstract class TarballFetcher {
         container.copyFileToContainer(MountableFile.forHostPath(tempFile), 
containerPath);
     }
 
+    private static void runInContainerAsRoot(GenericContainer<?> container, 
String... command)
+            throws InterruptedException {
+        ToStringConsumer stdoutConsumer = new ToStringConsumer();
+        ToStringConsumer stderrConsumer = new ToStringConsumer();
+        DockerClient dockerClient = DockerClientFactory.instance().client();
+        ExecCreateCmdResponse execCreateCmdResponse =
+                dockerClient
+                        .execCreateCmd(container.getContainerId())
+                        .withUser("root")
+                        .withCmd(command)
+                        .exec();
+        FrameConsumerResultCallback callback = new 
FrameConsumerResultCallback();
+        callback.addConsumer(OutputFrame.OutputType.STDOUT, stdoutConsumer);
+        callback.addConsumer(OutputFrame.OutputType.STDERR, stderrConsumer);
+        
dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(callback).awaitCompletion();
+    }
+
     /** Enum for all released Flink CDC version tags. */
     public enum CdcVersion {
         V3_1_1("3.1.1"),
diff --git a/flink-cdc-e2e-tests/pom.xml b/flink-cdc-e2e-tests/pom.xml
index 155c6b735..28d16beee 100644
--- a/flink-cdc-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/pom.xml
@@ -33,7 +33,6 @@ limitations under the License.
         <module>flink-cdc-e2e-utils</module>
         <module>flink-cdc-source-e2e-tests</module>
         <module>flink-cdc-pipeline-e2e-tests</module>
-        <module>flink-cdc-pipeline-e2e-tests-2.x</module>
     </modules>
 
     <build>
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java
index 6ea1ceaee..e89f3f814 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java
@@ -180,24 +180,26 @@ public class BatchDataSinkWriterOperator<CommT>
     }
 
     private Object createFlinkWriterOperator() {
+        Class<?> flinkWriterClass;
         try {
-            Class<?> flinkWriterClass =
+            flinkWriterClass =
                     getRuntimeContext()
                             .getUserCodeClassLoader()
                             .loadClass(
                                     
"org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Failed to load SinkWriterOperator 
class", e);
+        }
+
+        try {
             Constructor<?> constructor =
                     flinkWriterClass.getDeclaredConstructor(
                             Sink.class, ProcessingTimeService.class, 
MailboxExecutor.class);
             constructor.setAccessible(true);
             return constructor.newInstance(sink, processingTimeService, 
mailboxExecutor);
-        } catch (Exception ignore) {
+        } catch (NoSuchMethodException e) {
+            // Constructor with 3 parameters not found, try the 4-parameter 
version
             try {
-                Class<?> flinkWriterClass =
-                        getRuntimeContext()
-                                .getUserCodeClassLoader()
-                                .loadClass(
-                                        
"org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator");
                 Constructor<?> constructor =
                         flinkWriterClass.getDeclaredConstructor(
                                 StreamOperatorParameters.class,
@@ -206,9 +208,12 @@ public class BatchDataSinkWriterOperator<CommT>
                                 MailboxExecutor.class);
                 constructor.setAccessible(true);
                 return constructor.newInstance(null, sink, 
processingTimeService, mailboxExecutor);
-            } catch (Exception e) {
-                throw new RuntimeException("Failed to create 
SinkWriterOperator in Flink", e);
+            } catch (Exception ex) {
+                throw new RuntimeException("Failed to create 
SinkWriterOperator in Flink", ex);
             }
+        } catch (Exception e) {
+            // Other exceptions (e.g., InvocationTargetException) indicate 
real failures
+            throw new RuntimeException("Failed to create SinkWriterOperator in 
Flink", e);
         }
     }
 
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
index 0c45b57c2..bac2929ce 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
@@ -265,24 +265,26 @@ public class DataSinkWriterOperator<CommT>
     }
 
     private Object createFlinkWriterOperator() {
+        Class<?> flinkWriterClass;
         try {
-            Class<?> flinkWriterClass =
+            flinkWriterClass =
                     getRuntimeContext()
                             .getUserCodeClassLoader()
                             .loadClass(
                                     
"org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Failed to load SinkWriterOperator 
class", e);
+        }
+
+        try {
             Constructor<?> constructor =
                     flinkWriterClass.getDeclaredConstructor(
                             Sink.class, ProcessingTimeService.class, 
MailboxExecutor.class);
             constructor.setAccessible(true);
             return constructor.newInstance(sink, processingTimeService, 
mailboxExecutor);
-        } catch (Exception ignore) {
+        } catch (NoSuchMethodException e) {
+            // Constructor with 3 parameters not found, try the 4-parameter 
version
             try {
-                Class<?> flinkWriterClass =
-                        getRuntimeContext()
-                                .getUserCodeClassLoader()
-                                .loadClass(
-                                        
"org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator");
                 Constructor<?> constructor =
                         flinkWriterClass.getDeclaredConstructor(
                                 StreamOperatorParameters.class,
@@ -291,9 +293,12 @@ public class DataSinkWriterOperator<CommT>
                                 MailboxExecutor.class);
                 constructor.setAccessible(true);
                 return constructor.newInstance(null, sink, 
processingTimeService, mailboxExecutor);
-            } catch (Exception e) {
-                throw new RuntimeException("Failed to create 
SinkWriterOperator in Flink", e);
+            } catch (Exception ex) {
+                throw new RuntimeException("Failed to create 
SinkWriterOperator in Flink", ex);
             }
+        } catch (Exception e) {
+            // Other exceptions (e.g., InvocationTargetException) indicate 
real failures
+            throw new RuntimeException("Failed to create SinkWriterOperator in 
Flink", e);
         }
     }
 

Reply via email to