This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch FLINK-38729-2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 45948829ed3048c893a581a76529082e16024649 Author: lvyanquan <[email protected]> AuthorDate: Wed Mar 11 16:51:57 2026 +0800 Remove flink-cdc-pipeline-e2e-tests-2.x module. --- .github/workflows/flink_cdc_ci.yml | 2 +- .github/workflows/modules.py | 11 +- .../flink-cdc-pipeline-e2e-tests-2.x/pom.xml | 454 ------------------ .../tests/utils/PipelineTestEnvironment.java | 505 --------------------- .../cdc/pipeline/tests/utils/TarballFetcher.java | 151 ------ .../src/test/resources/docker/mysql/my.cnf | 64 --- .../src/test/resources/docker/mysql/setup.sql | 30 -- .../src/test/resources/log4j2-test.properties | 25 - .../flink-cdc-pipeline-e2e-tests/pom.xml | 39 ++ .../flink/cdc/pipeline/tests/ValuesE2eITCase.java | 14 +- .../cdc/pipeline/tests/utils/TarballFetcher.java | 30 +- flink-cdc-e2e-tests/pom.xml | 1 - .../sink/BatchDataSinkWriterOperator.java | 23 +- .../operators/sink/DataSinkWriterOperator.java | 23 +- 14 files changed, 104 insertions(+), 1268 deletions(-) diff --git a/.github/workflows/flink_cdc_ci.yml b/.github/workflows/flink_cdc_ci.yml index 66d2f71be..f7514f7e0 100644 --- a/.github/workflows/flink_cdc_ci.yml +++ b/.github/workflows/flink_cdc_ci.yml @@ -108,7 +108,7 @@ jobs: java-versions: "[11]" flink-versions: "['2.2.0']" custom-maven-parameter: "-Pflink2" - modules: "['pipeline_e2e_2.x']" + modules: "['pipeline_e2e']" parallelism: ${{ matrix.parallelism }} source_e2e: name: Source E2E Tests diff --git a/.github/workflows/modules.py b/.github/workflows/modules.py index d06c36d02..86e94164f 100755 --- a/.github/workflows/modules.py +++ b/.github/workflows/modules.py @@ -144,10 +144,6 @@ MODULES_PIPELINE_E2E = [ "flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests" ] -MODULES_PIPELINE_E2E_2_X = [ - "flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x" -] - MODULES_SOURCE_E2E = [ "flink-cdc-e2e-tests/flink-cdc-source-e2e-tests" ] @@ -182,18 +178,13 @@ ALL_MODULES = set( MODULES_SOURCE_E2E ) -# Modules that require the flink2 Maven profile to be activated -ALL_MODULES_FLINK2 = ALL_MODULES | set(MODULES_PIPELINE_E2E_2_X) - test_modules = set() compile_modules = set() for module in INPUT_MODULES.split(', '): module_list = set(globals()['MODULES_' + module.upper().replace('-', '_').replace('.', '_')]) test_modules |= module_list - if module == 'pipeline_e2e_2.x': - compile_modules |= ALL_MODULES_FLINK2 - elif module == 'source_e2e' or module == 'pipeline_e2e': + if module == 'source_e2e' or module == 'pipeline_e2e': compile_modules |= ALL_MODULES else: compile_modules |= module_list diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/pom.xml deleted file mode 100644 index 2d2a11259..000000000 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/pom.xml +++ /dev/null @@ -1,454 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>flink-cdc-e2e-tests</artifactId> - <groupId>org.apache.flink</groupId> - <version>${revision}</version> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>flink-cdc-pipeline-e2e-tests-2.x</artifactId> - - <properties> - <flink.version>${flink.2.x.version}</flink.version> - <mysql.driver.version>8.0.27</mysql.driver.version> - <flink.release.download.skip>true</flink.release.download.skip> - <skipITs>true</skipITs> - <flink.release.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.release.name> - <flink.release.mirror>https://dlcdn.apache.org/flink/flink-${flink.version}</flink.release.mirror> - <maven.plugin.download.version>1.6.8</maven.plugin.download.version> - <jmh.version>1.37</jmh.version> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-cdc-e2e-utils</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <!-- Drivers --> - <dependency> - <groupId>mysql</groupId> - <artifactId>mysql-connector-java</artifactId> - <exclusions> - <exclusion> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - </exclusions> - <version>${mysql.driver.version}</version> - <scope>test</scope> - </dependency> - - <!-- CDC connectors test utils - 2.x versions --> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-cdc-dist</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-mysql-cdc</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-cdc-pipeline-connector-values</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-test-util</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-cdc-pipeline-udf-examples</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <!-- testcontainers --> - <dependency> - <groupId>org.testcontainers</groupId> - <artifactId>testcontainers</artifactId> - <version>${testcontainers.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.testcontainers</groupId> - <artifactId>junit-jupiter</artifactId> - <version>${testcontainers.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.testcontainers</groupId> - <artifactId>mysql</artifactId> - <version>${testcontainers.version}</version> - <scope>test</scope> - </dependency> - - <!-- benchmark --> - <dependency> - <groupId>org.openjdk.jmh</groupId> - <artifactId>jmh-core</artifactId> - <version>${jmh.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.openjdk.jmh</groupId> - <artifactId>jmh-generator-annprocess</artifactId> - <version>${jmh.version}</version> - <scope>test</scope> - </dependency> - - <!-- test dependencies on TestContainers --> - <dependency> - <groupId>org.testcontainers</groupId> - <artifactId>postgresql</artifactId> - <version>${testcontainers.version}</version> - <scope>test</scope> - </dependency> - - <!-- This is for testing Scala UDF.--> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.testcontainers</groupId> - <artifactId>kafka</artifactId> - <version>${testcontainers.version}</version> - <scope>test</scope> - </dependency> - - <!-- mini yarn --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <!-- This dependency is no longer shipped with the JDK since Java 9.--> - <groupId>jdk.tools</groupId> - <artifactId>jdk.tools</artifactId> - </exclusion> - <exclusion> - <groupId>ch.qos.reload4j</groupId> - <artifactId>reload4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-reload4j</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>jdk.tools</groupId> - <artifactId>jdk.tools</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-client</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <artifactId>jdk.tools</artifactId> - <groupId>jdk.tools</groupId> - </exclusion> - <exclusion> - <artifactId>log4j</artifactId> - <groupId>log4j</groupId> - </exclusion> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-tests</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - <type>test-jar</type> - <exclusions> - <exclusion> - <artifactId>jdk.tools</artifactId> - <groupId>jdk.tools</groupId> - </exclusion> - <exclusion> - <artifactId>log4j</artifactId> - <groupId>log4j</groupId> - </exclusion> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - <type>test-jar</type> - <exclusions> - <exclusion> - <artifactId>jdk.tools</artifactId> - <groupId>jdk.tools</groupId> - </exclusion> - <exclusion> - <artifactId>log4j</artifactId> - <groupId>log4j</groupId> - </exclusion> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-yarn</artifactId> - <version>${flink.version}</version> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <testResources> - <testResource> - <directory>src/test/resources</directory> - <excludes> - <exclude>**/flink-cdc.sh</exclude> - <exclude>**/flink-cdc.yaml</exclude> - </excludes> - </testResource> - </testResources> - <plugins> - <!-- - Skip tests in the test phase. - Tests will run in the integration-test phase after shading is complete. - --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - - <!-- - Configure failsafe to run integration tests AFTER the package phase. - This ensures tests use the shaded artifacts where Guava and Flink APIs - have been relocated. - --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-failsafe-plugin</artifactId> - <version>3.0.0-M5</version> - <executions> - <execution> - <id>end-to-end-tests</id> - <goals> - <goal>integration-test</goal> - <goal>verify</goal> - </goals> - <configuration> - <includes> - <include>**/*.*</include> - </includes> - <excludes> - <exclude>**/MysqlE2eWithYarnApplicationITCase.java</exclude> - </excludes> - <forkCount>1</forkCount> - <systemPropertyVariables> - <moduleDir>${project.basedir}</moduleDir> - </systemPropertyVariables> - </configuration> - </execution> - <execution> - <id>run-last-test</id> - <goals> - <goal>integration-test</goal> - <goal>verify</goal> - </goals> - <configuration> - <includes> - <include>**/MysqlE2eWithYarnApplicationITCase.java</include> - </includes> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>com.googlecode.maven-download-plugin</groupId> - <artifactId>download-maven-plugin</artifactId> - <version>1.6.8</version> - <configuration> - <cacheDirectory>${maven.plugin.download.cache.path}</cacheDirectory> - <outputDirectory>${project.build.directory}</outputDirectory> - <readTimeOut>60000</readTimeOut> - <retries>3</retries> - <unpack>true</unpack> - </configuration> - <executions> - <execution> - <id>download-flink-release</id> - <goals> - <goal>wget</goal> - </goals> - <phase>test</phase> - <configuration> - <skip>${flink.release.download.skip}</skip> - <url>${flink.release.mirror}/${flink.release.name}</url> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <id>copy-jars</id> - <phase>test</phase> - <goals> - <goal>copy</goal> - </goals> - </execution> - <execution> - <id>store-classpath-in-target-for-tests</id> - <phase>process-test-resources</phase> - <goals> - <goal>build-classpath</goal> - </goals> - <configuration> - <outputFile>${project.build.directory}/yarn.classpath</outputFile> - <excludeGroupIds>org.apache.flink</excludeGroupIds> - </configuration> - </execution> - </executions> - <configuration> - <artifactItems> - - <artifactItem> - <groupId>mysql</groupId> - <artifactId>mysql-connector-java</artifactId> - <version>${mysql.driver.version}</version> - <destFileName>mysql-driver.jar</destFileName> - <type>jar</type> - <outputDirectory>${project.build.directory}/dependencies - </outputDirectory> - </artifactItem> - - <artifactItem> - <groupId>org.apache.flink</groupId> - <artifactId>flink-cdc-dist</artifactId> - <version>${project.version}</version> - <destFileName>flink-cdc-dist.jar</destFileName> - <type>jar</type> - <outputDirectory>${project.build.directory}/dependencies - </outputDirectory> - </artifactItem> - - <artifactItem> - <groupId>org.apache.flink</groupId> - <artifactId>flink-cdc-pipeline-connector-values</artifactId> - <version>${project.version}</version> - <destFileName>values-cdc-pipeline-connector.jar</destFileName> - <type>jar</type> - <outputDirectory>${project.build.directory}/dependencies - </outputDirectory> - </artifactItem> - - <artifactItem> - <groupId>org.apache.flink</groupId> - <artifactId>flink-cdc-pipeline-udf-examples</artifactId> - <version>${project.version}</version> - <destFileName>udf-examples.jar</destFileName> - <type>jar</type> - <outputDirectory>${project.build.directory}/dependencies - </outputDirectory> - </artifactItem> - - <artifactItem> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - <destFileName>scala-library.jar</destFileName> - <type>jar</type> - <outputDirectory>${project.build.directory}/dependencies - </outputDirectory> - </artifactItem> - </artifactItems> - </configuration> - </plugin> - </plugins> - </build> - - <profiles> - <profile> - <id>flink2</id> - <properties> - <flink.release.download.skip>false</flink.release.download.skip> - <skipITs>false</skipITs> - </properties> - </profile> - </profiles> -</project> \ No newline at end of file diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java deleted file mode 100644 index b1d57c778..000000000 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java +++ /dev/null @@ -1,505 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.pipeline.tests.utils; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Deadline; -import org.apache.flink.cdc.common.test.utils.TestUtils; -import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; -import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; -import org.apache.flink.client.deployment.StandaloneClusterId; -import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.RestOptions; -import org.apache.flink.runtime.client.JobStatusMessage; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.util.TestLogger; - -import com.github.dockerjava.api.DockerClient; -import com.github.dockerjava.api.command.ExecCreateCmdResponse; -import com.github.dockerjava.api.model.Volume; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.DockerClientFactory; -import org.testcontainers.containers.BindMode; -import org.testcontainers.containers.Container.ExecResult; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.FrameConsumerResultCallback; -import org.testcontainers.containers.output.OutputFrame; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.output.ToStringConsumer; -import org.testcontainers.images.builder.Transferable; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.MountableFile; - -import javax.annotation.Nullable; - -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Function; -import java.util.stream.Stream; - -import static org.apache.flink.util.Preconditions.checkState; - -/** Test environment running pipeline job on Flink containers. */ -@Testcontainers -public abstract class PipelineTestEnvironment extends TestLogger { - - private static final Logger LOG = LoggerFactory.getLogger(PipelineTestEnvironment.class); - - protected Integer parallelism = getParallelism(); - - private int getParallelism() { - try { - return Integer.parseInt(System.getProperty("specifiedParallelism")); - } catch (NumberFormatException ex) { - LOG.warn( - "Unable to parse specified parallelism configuration ({} provided). Use 4 by default.", - System.getProperty("specifiedParallelism")); - return 4; - } - } - - // ------------------------------------------------------------------------------------------ - // MySQL Variables (we always use MySQL as the data source for easier verifying) - // ------------------------------------------------------------------------------------------ - protected static final String MYSQL_TEST_USER = "mysqluser"; - protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; - protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql"; - protected static final Duration EVENT_WAITING_TIMEOUT = Duration.ofMinutes(3); - protected static final Duration STARTUP_WAITING_TIMEOUT = Duration.ofMinutes(5); - - public static final Network NETWORK = Network.newNetwork(); - - @Container - protected static final MySqlContainer MYSQL = - (MySqlContainer) - new MySqlContainer(MySqlVersion.V8_0) - .withConfigurationOverride("docker/mysql/my.cnf") - .withSetupSQL("docker/mysql/setup.sql") - .withDatabaseName("flink-test") - .withUsername("flinkuser") - .withPassword("flinkpw") - .withNetwork(NETWORK) - .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS) - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - // ------------------------------------------------------------------------------------------ - // Flink Variables - // ------------------------------------------------------------------------------------------ - protected static final int JOB_MANAGER_REST_PORT = 8081; - protected static final String INTER_CONTAINER_JM_ALIAS = "jobmanager"; - protected static final String INTER_CONTAINER_TM_ALIAS = "taskmanager"; - protected static final List<String> EXTERNAL_PROPS = - Arrays.asList( - String.format("jobmanager.rpc.address: %s", INTER_CONTAINER_JM_ALIAS), - "jobmanager.bind-host: 0.0.0.0", - "taskmanager.bind-host: 0.0.0.0", - "rest.bind-address: 0.0.0.0", - "rest.address: 0.0.0.0", - "jobmanager.memory.process.size: 1GB", - "query.server.port: 6125", - "blob.server.port: 6124", - "taskmanager.numberOfTaskSlots: 10", - "parallelism.default: 4", - "execution.checkpointing.interval: 300", - "state.backend.type: hashmap", - "env.java.default-opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.sec [...] - "execution.checkpointing.savepoint-dir: file:///opt/flink", - "restart-strategy.type: off", - "pekko.ask.timeout: 60s", - // Set off-heap memory explicitly to avoid "java.lang.OutOfMemoryError: Direct - // buffer memory" error. - "taskmanager.memory.task.off-heap.size: 128mb", - // Fix `java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error - // has occurred` error. - "taskmanager.memory.jvm-metaspace.size: 512mb"); - public static final String FLINK_PROPERTIES = String.join("\n", EXTERNAL_PROPS); - - @Nullable protected RestClusterClient<StandaloneClusterId> restClusterClient; - - protected GenericContainer<?> jobManager; - protected GenericContainer<?> taskManager; - protected Volume sharedVolume = new Volume("/tmp/shared"); - - protected ToStringConsumer jobManagerConsumer; - - protected ToStringConsumer taskManagerConsumer; - - protected String flinkVersion = getFlinkVersion(); - - public static String getFlinkVersion() { - return "2.2.0"; - } - - protected List<String> copyJarToFlinkLib() { - return Collections.emptyList(); - } - - @BeforeEach - public void before() throws Exception { - LOG.info("Starting containers..."); - jobManagerConsumer = new ToStringConsumer(); - jobManager = - new GenericContainer<>(getFlinkDockerImageTag()) - .withCommand("jobmanager") - .withNetwork(NETWORK) - .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) - .withExposedPorts(JOB_MANAGER_REST_PORT) - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) - .withCreateContainerCmdModifier(cmd -> cmd.withVolumes(sharedVolume)) - .withLogConsumer(jobManagerConsumer); - - List<String> jarToCopy = copyJarToFlinkLib(); - if (!jarToCopy.isEmpty()) { - for (String jar : jarToCopy) { - jobManager.withCopyFileToContainer( - MountableFile.forHostPath(TestUtils.getResource(jar)), "/opt/flink/lib/"); - } - } - - Startables.deepStart(Stream.of(jobManager)).join(); - runInContainerAsRoot(jobManager, "chmod", "0777", "-R", sharedVolume.toString()); - LOG.info("JobManager is started."); - - taskManagerConsumer = new ToStringConsumer(); - taskManager = - new GenericContainer<>(getFlinkDockerImageTag()) - .withCommand("taskmanager") - .withNetwork(NETWORK) - .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) - .dependsOn(jobManager) - .withVolumesFrom(jobManager, BindMode.READ_WRITE) - .withLogConsumer(taskManagerConsumer); - Startables.deepStart(Stream.of(taskManager)).join(); - runInContainerAsRoot(taskManager, "chmod", "0777", "-R", sharedVolume.toString()); - LOG.info("TaskManager is started."); - - TarballFetcher.fetchLatest(jobManager); - LOG.info("CDC executables deployed."); - } - - @AfterEach - public void after() { - if (restClusterClient != null) { - restClusterClient.close(); - } - if (jobManager != null) { - jobManager.stop(); - } - if (taskManager != null) { - taskManager.stop(); - } - } - - /** - * Submits a YAML job to the running cluster with latest CDC version, without from previous - * savepoint states. - */ - public JobID submitPipelineJob(String pipelineJob, Path... jars) throws Exception { - return submitPipelineJob( - TarballFetcher.CdcVersion.SNAPSHOT, pipelineJob, null, false, jars); - } - - /** - * Submits a YAML job to the running cluster with specific CDC version, without from previous - * savepoint states. - */ - public JobID submitPipelineJob( - TarballFetcher.CdcVersion version, String pipelineJob, Path... jars) throws Exception { - return submitPipelineJob(version, pipelineJob, null, false, jars); - } - - /** Submits a YAML job to the running cluster with latest CDC version. */ - public JobID submitPipelineJob( - String pipelineJob, - @Nullable String savepointPath, - boolean allowNonRestoredState, - Path... jars) - throws Exception { - return submitPipelineJob( - TarballFetcher.CdcVersion.SNAPSHOT, - pipelineJob, - savepointPath, - allowNonRestoredState, - jars); - } - - public JobID submitPipelineJob( - TarballFetcher.CdcVersion version, - String pipelineJob, - @Nullable String savepointPath, - boolean allowNonRestoredState, - Path... jars) - throws Exception { - - // Prepare external JAR dependencies - List<Path> paths = new ArrayList<>(Arrays.asList(jars)); - List<String> containerPaths = new ArrayList<>(); - paths.add(TestUtils.getResource("mysql-driver.jar")); - - for (Path jar : paths) { - String containerPath = version.workDir() + "/lib/" + jar.getFileName(); - jobManager.copyFileToContainer(MountableFile.forHostPath(jar), containerPath); - containerPaths.add(containerPath); - } - - containerPaths.add(version.workDir() + "/lib/values-cdc-pipeline-connector.jar"); - - StringBuilder sb = new StringBuilder(); - for (String containerPath : containerPaths) { - sb.append(" --jar ").append(containerPath); - } - - jobManager.copyFileToContainer( - Transferable.of(pipelineJob), version.workDir() + "/conf/pipeline.yaml"); - - String commands = - version.workDir() - + "/bin/flink-cdc.sh " - + version.workDir() - + "/conf/pipeline.yaml --flink-home /opt/flink" - + sb; - - if (savepointPath != null) { - commands += " --from-savepoint " + savepointPath; - if (allowNonRestoredState) { - commands += " --allow-nonRestored-state"; - } - } - LOG.info("Execute command: {}", commands); - ExecResult execResult = executeAndCheck(jobManager, commands); - return Arrays.stream(execResult.getStdout().split("\n")) - .filter(line -> line.startsWith("Job ID: ")) - .findFirst() - .map(line -> line.split(": ")[1]) - .map(JobID::fromHexString) - .orElse(null); - } - - public String stopJobWithSavepoint(JobID jobID) { - String savepointPath = "/opt/flink/"; - ExecResult result = - executeAndCheck( - jobManager, - "flink", - "stop", - jobID.toHexString(), - "--savepointPath", - savepointPath); - - return Arrays.stream(result.getStdout().split("\n")) - .filter(line -> line.startsWith("Savepoint completed.")) - .findFirst() - .map(line -> line.split("Path: file:")[1]) - .orElseThrow( - () -> new RuntimeException("Failed to parse savepoint path from stdout.")); - } - - public void cancelJob(JobID jobID) { - executeAndCheck(jobManager, "flink", "cancel", jobID.toHexString()); - } - - /** - * Get {@link RestClusterClient} connected to this FlinkContainer. - * - * <p>This method lazily initializes the REST client on-demand. - */ - public RestClusterClient<StandaloneClusterId> getRestClusterClient() { - if (restClusterClient != null) { - return restClusterClient; - } - checkState( - jobManager.isRunning(), - "Cluster client should only be retrieved for a running cluster"); - try { - final Configuration clientConfiguration = new Configuration(); - clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost()); - clientConfiguration.set( - RestOptions.PORT, jobManager.getMappedPort(JOB_MANAGER_REST_PORT)); - this.restClusterClient = - new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance()); - } catch (Exception e) { - throw new IllegalStateException( - "Failed to create client for Flink container cluster", e); - } - return restClusterClient; - } - - public void waitUntilJobRunning(Duration timeout) { - waitUntilJobState(timeout, JobStatus.RUNNING); - } - - public void waitUntilJobFinished(Duration timeout) { - waitUntilJobState(timeout, JobStatus.FINISHED); - } - - public void waitUntilJobState(Duration timeout, JobStatus expectedStatus) { - RestClusterClient<?> clusterClient = getRestClusterClient(); - Deadline deadline = Deadline.fromNow(timeout); - while (deadline.hasTimeLeft()) { - Collection<JobStatusMessage> jobStatusMessages; - try { - jobStatusMessages = clusterClient.listJobs().get(10, TimeUnit.SECONDS); - } catch (Exception e) { - LOG.warn("Error when fetching job status.", e); - continue; - } - if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) { - JobStatusMessage message = jobStatusMessages.iterator().next(); - JobStatus jobStatus = message.getJobState(); - if (!expectedStatus.isTerminalState() && jobStatus.isTerminalState()) { - throw new ValidationException( - String.format( - "Job has been terminated! JobName: %s, JobID: %s, Status: %s", - message.getJobName(), - message.getJobId(), - message.getJobState())); - } else if (jobStatus == expectedStatus) { - return; - } - } - } - } - - protected String getFlinkDockerImageTag() { - if (System.getProperty("java.specification.version").equals("17")) { - return String.format("flink:%s-scala_2.12-java17", flinkVersion); - } - return String.format("flink:%s-scala_2.12-java11", flinkVersion); - } - - private ExecResult executeAndCheck(GenericContainer<?> container, String... command) { - String joinedCommand = String.join(" ", command); - try { - LOG.info("Executing command {}", joinedCommand); - ExecResult execResult = - container.execInContainer("bash", "-c", String.join(" ", command)); - LOG.info(execResult.getStdout()); - if (execResult.getExitCode() == 0) { - LOG.info("Command executed successfully."); - return execResult; - } else { - LOG.error(execResult.getStderr()); - throw new AssertionError( - "Failed when submitting the pipeline job.\n" - + "Exit code: " - + execResult.getExitCode() - + "\n" - + "StdOut: " - + execResult.getStdout() - + "\n" - + "StdErr: " - + execResult.getStderr()); - } - } catch (Exception e) { - throw new RuntimeException( - "Failed to execute command " + joinedCommand + " in container " + container); - } - } - - public void runInContainerAsRoot(GenericContainer<?> container, String... command) - throws InterruptedException { - ToStringConsumer stdoutConsumer = new ToStringConsumer(); - ToStringConsumer stderrConsumer = new ToStringConsumer(); - DockerClient dockerClient = DockerClientFactory.instance().client(); - ExecCreateCmdResponse execCreateCmdResponse = - dockerClient - .execCreateCmd(container.getContainerId()) - .withUser("root") - .withCmd(command) - .exec(); - FrameConsumerResultCallback callback = new FrameConsumerResultCallback(); - callback.addConsumer(OutputFrame.OutputType.STDOUT, stdoutConsumer); - callback.addConsumer(OutputFrame.OutputType.STDERR, stderrConsumer); - dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(callback).awaitCompletion(); - } - - protected List<String> readLines(String resource) throws IOException { - final URL url = PipelineTestEnvironment.class.getClassLoader().getResource(resource); - assert url != null; - Path path = new File(url.getFile()).toPath(); - return Files.readAllLines(path); - } - - protected void validateResult(String... expectedEvents) throws Exception { - validateResult(Function.identity(), expectedEvents); - } - - protected void validateResult(Function<String, String> mapper, String... expectedEvents) - throws Exception { - validateResult( - taskManagerConsumer, Stream.of(expectedEvents).map(mapper).toArray(String[]::new)); - } - - protected void validateResult(ToStringConsumer consumer, String... expectedEvents) - throws Exception { - for (String event : expectedEvents) { - waitUntilSpecificEvent(consumer, event); - } - } - - protected void validateResult( - ToStringConsumer consumer, Function<String, String> mapper, String... expectedEvents) - throws Exception { - validateResult(consumer, Stream.of(expectedEvents).map(mapper).toArray(String[]::new)); - } - - protected void waitUntilSpecificEvent(String event) throws Exception { - waitUntilSpecificEvent(taskManagerConsumer, event); - } - - protected void waitUntilSpecificEvent(ToStringConsumer consumer, String event) - throws Exception { - boolean result = false; - long endTimeout = System.currentTimeMillis() + EVENT_WAITING_TIMEOUT.toMillis(); - while (System.currentTimeMillis() < endTimeout) { - String stdout = consumer.toUtf8String(); - if (stdout.contains(event + "\n")) { - result = true; - break; - } - Thread.sleep(1000); - } - if (!result) { - throw new TimeoutException( - "failed to get specific event: " - + event - + " from stdout: " - + consumer.toUtf8String()); - } - } -} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java deleted file mode 100644 index 8bd16d805..000000000 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.pipeline.tests.utils; - -import org.apache.flink.cdc.common.test.utils.TestUtils; - -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.utility.MountableFile; - -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; -import java.util.Arrays; -import java.util.List; - -/** Obtain and downloads corresponding Flink CDC tarball files. */ -public abstract class TarballFetcher { - - private static final Logger LOG = LoggerFactory.getLogger(TarballFetcher.class); - - public static void fetchAll(GenericContainer<?> container) throws Exception { - fetch(container, CdcVersion.values()); - } - - public static void fetchLatest(GenericContainer<?> container) throws Exception { - fetch(container, CdcVersion.SNAPSHOT); - } - - public static void fetch(GenericContainer<?> container, CdcVersion... versions) - throws Exception { - for (CdcVersion version : versions) { - TarballFetcher.fetchInternal(container, version); - } - } - - private static void fetchInternal(GenericContainer<?> container, CdcVersion version) - throws Exception { - LOG.info("Trying to download CDC tarball @ {}...", version); - if (CdcVersion.SNAPSHOT.equals(version)) { - LOG.info("CDC {} is a snapshot version, we should fetch it locally...", version); - - container.copyFileToContainer( - MountableFile.forHostPath( - TestUtils.getResource("flink-cdc.sh", "flink-cdc-dist", "src"), 0755), - version.workDir() + "/bin/flink-cdc.sh"); - container.copyFileToContainer( - MountableFile.forHostPath( - TestUtils.getResource("flink-cdc.yaml", "flink-cdc-dist", "src"), 0755), - version.workDir() + "/conf/flink-cdc.yaml"); - container.copyFileToContainer( - MountableFile.forHostPath(TestUtils.getResource("flink-cdc-dist.jar")), - version.workDir() + "/lib/flink-cdc-dist.jar"); - container.copyFileToContainer( - MountableFile.forHostPath( - TestUtils.getResource("values-cdc-pipeline-connector.jar")), - version.workDir() + "/lib/values-cdc-pipeline-connector.jar"); - - } else { - LOG.info("CDC {} is a released version, download it from the Internet...", version); - - String containerPath = "/tmp/tarball/" + version.getVersion() + ".tar.gz"; - downloadAndCopyToContainer(container, version.tarballUrl(), containerPath); - container.execInContainer("mkdir", "-p", version.workDir()); - container.execInContainer( - "tar", "-xzvf", containerPath, "-C", version.workDir(), "--strip-components=1"); - - downloadAndCopyToContainer( - container, - version.connectorJarUrl("values"), - version.workDir() + "/lib/values-cdc-pipeline-connector.jar"); - } - } - - private static void downloadAndCopyToContainer( - GenericContainer<?> container, String url, String containerPath) throws Exception { - Path tempFile = Files.createTempFile("download-", ".tmp"); - FileUtils.copyURLToFile( - new URL(url), - tempFile.toFile(), - (int) Duration.ofMinutes(1).toMillis(), - (int) Duration.ofMinutes(5).toMillis()); - container.copyFileToContainer(MountableFile.forHostPath(tempFile), containerPath); - } - - /** Enum for all released Flink CDC version tags. */ - public enum CdcVersion { - V3_1_1("3.1.1"), - V3_2_0("3.2.0"), - V3_2_1("3.2.1"), - V3_3_0("3.3.0"), - V3_4_0("3.4.0"), - V3_5_0("3.5.0"), - SNAPSHOT("SNAPSHOT"); - - private final String version; - - CdcVersion(String version) { - this.version = version; - } - - public String getVersion() { - return version; - } - - public static List<CdcVersion> getAllVersions() { - return Arrays.asList(CdcVersion.values()); - } - - public static List<CdcVersion> getVersionsSince(CdcVersion version) { - return getAllVersions() - .subList(getAllVersions().indexOf(version), getAllVersions().size()); - } - - public String tarballUrl() { - return "https://dlcdn.apache.org/flink/flink-cdc-" - + version - + "/flink-cdc-" - + version - + "-bin.tar.gz"; - } - - public String workDir() { - return "/tmp/cdc/" + version; - } - - public String connectorJarUrl(String name) { - return String.format( - "https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-%s/%s/flink-cdc-pipeline-connector-%s-%s.jar", - name, version, name, version); - } - } -} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/my.cnf b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/my.cnf deleted file mode 100644 index 26d1abad6..000000000 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/my.cnf +++ /dev/null @@ -1,64 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# For advice on how to change settings please see -# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html - -[mysqld] -# -# Remove leading # and set to the amount of RAM for the most important data -# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. -# innodb_buffer_pool_size = 128M -# -# Remove leading # to turn on a very important data integrity option: logging -# changes to the binary log between backups. -# log_bin -# -# Remove leading # to set options mainly useful for reporting servers. -# The server defaults are faster for transactions and fast SELECTs. -# Adjust sizes as needed, experiment to find the optimal values. -# join_buffer_size = 128M -# sort_buffer_size = 2M -# read_rnd_buffer_size = 2M -skip-host-cache -skip-name-resolve -#datadir=/var/lib/mysql -#socket=/var/lib/mysql/mysql.sock -#secure-file-priv=/var/lib/mysql-files -secure-file-priv=/var/lib/mysql -user=mysql - -# Disabling symbolic-links is recommended to prevent assorted security risks -symbolic-links=0 - -#log-error=/var/log/mysqld.log -#pid-file=/var/run/mysqld/mysqld.pid - -# ---------------------------------------------- -# Enable the binlog for replication & CDC -# ---------------------------------------------- - -# Enable binary replication log and set the prefix, expiration, and log format. -# The prefix is arbitrary, expiration can be short for integration tests but would -# be longer on a production system. Row-level info is required for ingest to work. -# Server ID is required, but this will vary on production systems -server-id = 223344 -log_bin = mysql-bin -expire_logs_days = 1 -binlog_format = row - -# enable gtid mode -gtid_mode = on -enforce_gtid_consistency = on \ No newline at end of file diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/setup.sql b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/setup.sql deleted file mode 100644 index 3c0cccb4b..000000000 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/setup.sql +++ /dev/null @@ -1,30 +0,0 @@ --- Licensed to the Apache Software Foundation (ASF) under one or more --- contributor license agreements. See the NOTICE file distributed with --- this work for additional information regarding copyright ownership. --- The ASF licenses this file to You under the Apache License, Version 2.0 --- (the "License"); you may not use this file except in compliance with --- the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. - --- In production you would almost certainly limit the replication user must be on the follower (slave) machine, --- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'. --- However, in this database we'll grant 2 users different privileges: --- --- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing) --- 2) 'mysqluser' - all privileges --- -GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%'; -CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw'; -GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%'; - --- ---------------------------------------------------------------------------------------------------------------- --- DATABASE: emptydb --- ---------------------------------------------------------------------------------------------------------------- -CREATE DATABASE emptydb; diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/log4j2-test.properties b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/log4j2-test.properties deleted file mode 100644 index 32df1c025..000000000 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/log4j2-test.properties +++ /dev/null @@ -1,25 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Set root logger level to ERROR to not flood build logs -# set manually to INFO for debugging purposes -rootLogger.level=ERROR -rootLogger.appenderRef.test.ref = TestLogger - -appender.testlogger.name = TestLogger -appender.testlogger.type = CONSOLE -appender.testlogger.target = SYSTEM_ERR -appender.testlogger.layout.type = PatternLayout -appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml index 8c263f8de..897d1acc3 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml @@ -805,4 +805,43 @@ limitations under the License. </plugin> </plugins> </build> + + <profiles> + <profile> + <id>flink2</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <execution> + <id>default-test</id> + <phase>none</phase> + </execution> + <execution> + <id>integration-tests</id> + <phase>none</phase> + </execution> + <execution> + <id>end-to-end-tests</id> + <configuration> + <includes> + <include>**/ValuesE2eITCase.java</include> + </includes> + <excludes> + <exclude>**/MysqlE2eWithYarnApplicationITCase.java</exclude> + </excludes> + </configuration> + </execution> + <execution> + <id>run-last-test</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> </project> diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java similarity index 97% rename from flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java rename to flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java index 7d863333d..0f7bc28cb 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java @@ -47,7 +47,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment { submitPipelineJob(pipelineJob); waitUntilJobFinished(Duration.ofSeconds(30)); - LOG.info("Pipeline job is running"); + LOG.info("Pipeline job has finished"); validateResult( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", @@ -79,7 +79,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment { submitPipelineJob(pipelineJob); waitUntilJobFinished(Duration.ofSeconds(30)); - LOG.info("Pipeline job is running"); + LOG.info("Pipeline job has finished"); validateResult( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", @@ -115,7 +115,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment { submitPipelineJob(pipelineJob); waitUntilJobFinished(Duration.ofSeconds(30)); - LOG.info("Pipeline job is running"); + LOG.info("Pipeline job has finished"); validateResult( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", @@ -151,7 +151,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment { submitPipelineJob(pipelineJob); waitUntilJobFinished(Duration.ofSeconds(30)); - LOG.info("Pipeline job is running"); + LOG.info("Pipeline job has finished"); validateResult( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", @@ -186,7 +186,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment { submitPipelineJob(pipelineJob); waitUntilJobFinished(Duration.ofSeconds(30)); - LOG.info("Pipeline job is running"); + LOG.info("Pipeline job has finished"); validateResult( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", @@ -214,7 +214,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment { submitPipelineJob(pipelineJob); waitUntilJobFinished(Duration.ofSeconds(30)); - LOG.info("Pipeline job is running"); + LOG.info("Pipeline job has finished"); validateResult( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", @@ -246,7 +246,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment { submitPipelineJob(pipelineJob); waitUntilJobFinished(Duration.ofSeconds(30)); - LOG.info("Pipeline job is running"); + LOG.info("Pipeline job has finished"); validateResult( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java index 1d5788918..450f683d6 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java @@ -19,10 +19,16 @@ package org.apache.flink.cdc.pipeline.tests.utils; import org.apache.flink.cdc.common.test.utils.TestUtils; +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.command.ExecCreateCmdResponse; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.DockerClientFactory; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.FrameConsumerResultCallback; +import org.testcontainers.containers.output.OutputFrame; +import org.testcontainers.containers.output.ToStringConsumer; import org.testcontainers.utility.MountableFile; import java.net.URL; @@ -60,11 +66,11 @@ public abstract class TarballFetcher { container.copyFileToContainer( MountableFile.forHostPath( - TestUtils.getResource("flink-cdc.sh", "flink-cdc-dist", "src"), 755), + TestUtils.getResource("flink-cdc.sh", "flink-cdc-dist", "src")), version.workDir() + "/bin/flink-cdc.sh"); container.copyFileToContainer( MountableFile.forHostPath( - TestUtils.getResource("flink-cdc.yaml", "flink-cdc-dist", "src"), 755), + TestUtils.getResource("flink-cdc.yaml", "flink-cdc-dist", "src")), version.workDir() + "/conf/flink-cdc.yaml"); container.copyFileToContainer( MountableFile.forHostPath(TestUtils.getResource("flink-cdc-dist.jar")), @@ -78,6 +84,9 @@ public abstract class TarballFetcher { TestUtils.getResource("values-cdc-pipeline-connector.jar")), version.workDir() + "/lib/values-cdc-pipeline-connector.jar"); + // Ensure the script has execute permission + runInContainerAsRoot(container, "chmod", "+x", version.workDir() + "/bin/flink-cdc.sh"); + } else { LOG.info("CDC {} is a released version, download it from the Internet...", version); @@ -109,6 +118,23 @@ public abstract class TarballFetcher { container.copyFileToContainer(MountableFile.forHostPath(tempFile), containerPath); } + private static void runInContainerAsRoot(GenericContainer<?> container, String... command) + throws InterruptedException { + ToStringConsumer stdoutConsumer = new ToStringConsumer(); + ToStringConsumer stderrConsumer = new ToStringConsumer(); + DockerClient dockerClient = DockerClientFactory.instance().client(); + ExecCreateCmdResponse execCreateCmdResponse = + dockerClient + .execCreateCmd(container.getContainerId()) + .withUser("root") + .withCmd(command) + .exec(); + FrameConsumerResultCallback callback = new FrameConsumerResultCallback(); + callback.addConsumer(OutputFrame.OutputType.STDOUT, stdoutConsumer); + callback.addConsumer(OutputFrame.OutputType.STDERR, stderrConsumer); + dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(callback).awaitCompletion(); + } + /** Enum for all released Flink CDC version tags. */ public enum CdcVersion { V3_1_1("3.1.1"), diff --git a/flink-cdc-e2e-tests/pom.xml b/flink-cdc-e2e-tests/pom.xml index 155c6b735..28d16beee 100644 --- a/flink-cdc-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/pom.xml @@ -33,7 +33,6 @@ limitations under the License. <module>flink-cdc-e2e-utils</module> <module>flink-cdc-source-e2e-tests</module> <module>flink-cdc-pipeline-e2e-tests</module> - <module>flink-cdc-pipeline-e2e-tests-2.x</module> </modules> <build> diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java index 6ea1ceaee..e89f3f814 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java @@ -180,24 +180,26 @@ public class BatchDataSinkWriterOperator<CommT> } private Object createFlinkWriterOperator() { + Class<?> flinkWriterClass; try { - Class<?> flinkWriterClass = + flinkWriterClass = getRuntimeContext() .getUserCodeClassLoader() .loadClass( "org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Failed to load SinkWriterOperator class", e); + } + + try { Constructor<?> constructor = flinkWriterClass.getDeclaredConstructor( Sink.class, ProcessingTimeService.class, MailboxExecutor.class); constructor.setAccessible(true); return constructor.newInstance(sink, processingTimeService, mailboxExecutor); - } catch (Exception ignore) { + } catch (NoSuchMethodException e) { + // Constructor with 3 parameters not found, try the 4-parameter version try { - Class<?> flinkWriterClass = - getRuntimeContext() - .getUserCodeClassLoader() - .loadClass( - "org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator"); Constructor<?> constructor = flinkWriterClass.getDeclaredConstructor( StreamOperatorParameters.class, @@ -206,9 +208,12 @@ public class BatchDataSinkWriterOperator<CommT> MailboxExecutor.class); constructor.setAccessible(true); return constructor.newInstance(null, sink, processingTimeService, mailboxExecutor); - } catch (Exception e) { - throw new RuntimeException("Failed to create SinkWriterOperator in Flink", e); + } catch (Exception ex) { + throw new RuntimeException("Failed to create SinkWriterOperator in Flink", ex); } + } catch (Exception e) { + // Other exceptions (e.g., InvocationTargetException) indicate real failures + throw new RuntimeException("Failed to create SinkWriterOperator in Flink", e); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java index 0c45b57c2..bac2929ce 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java @@ -265,24 +265,26 @@ public class DataSinkWriterOperator<CommT> } private Object createFlinkWriterOperator() { + Class<?> flinkWriterClass; try { - Class<?> flinkWriterClass = + flinkWriterClass = getRuntimeContext() .getUserCodeClassLoader() .loadClass( "org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Failed to load SinkWriterOperator class", e); + } + + try { Constructor<?> constructor = flinkWriterClass.getDeclaredConstructor( Sink.class, ProcessingTimeService.class, MailboxExecutor.class); constructor.setAccessible(true); return constructor.newInstance(sink, processingTimeService, mailboxExecutor); - } catch (Exception ignore) { + } catch (NoSuchMethodException e) { + // Constructor with 3 parameters not found, try the 4-parameter version try { - Class<?> flinkWriterClass = - getRuntimeContext() - .getUserCodeClassLoader() - .loadClass( - "org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator"); Constructor<?> constructor = flinkWriterClass.getDeclaredConstructor( StreamOperatorParameters.class, @@ -291,9 +293,12 @@ public class DataSinkWriterOperator<CommT> MailboxExecutor.class); constructor.setAccessible(true); return constructor.newInstance(null, sink, processingTimeService, mailboxExecutor); - } catch (Exception e) { - throw new RuntimeException("Failed to create SinkWriterOperator in Flink", e); + } catch (Exception ex) { + throw new RuntimeException("Failed to create SinkWriterOperator in Flink", ex); } + } catch (Exception e) { + // Other exceptions (e.g., InvocationTargetException) indicate real failures + throw new RuntimeException("Failed to create SinkWriterOperator in Flink", e); } }
