This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch FLINK-38729-2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/FLINK-38729-2 by this push:
new 41172a30f Remove flink-cdc-pipeline-e2e-tests-2.x module.
41172a30f is described below
commit 41172a30f6867caff334f4ea2f2d4b200a79b193
Author: lvyanquan <[email protected]>
AuthorDate: Wed Mar 11 16:51:57 2026 +0800
Remove flink-cdc-pipeline-e2e-tests-2.x module.
---
.github/workflows/flink_cdc_ci.yml | 2 +-
.github/workflows/modules.py | 11 +-
.../flink-cdc-pipeline-e2e-tests-2.x/pom.xml | 454 ------------------
.../tests/utils/PipelineTestEnvironment.java | 505 ---------------------
.../cdc/pipeline/tests/utils/TarballFetcher.java | 151 ------
.../src/test/resources/docker/mysql/my.cnf | 64 ---
.../src/test/resources/docker/mysql/setup.sql | 30 --
.../src/test/resources/log4j2-test.properties | 25 -
.../flink-cdc-pipeline-e2e-tests/pom.xml | 39 ++
.../flink/cdc/pipeline/tests/ValuesE2eITCase.java | 14 +-
.../cdc/pipeline/tests/utils/TarballFetcher.java | 30 +-
flink-cdc-e2e-tests/pom.xml | 1 -
.../sink/BatchDataSinkWriterOperator.java | 23 +-
.../operators/sink/DataSinkWriterOperator.java | 23 +-
14 files changed, 104 insertions(+), 1268 deletions(-)
diff --git a/.github/workflows/flink_cdc_ci.yml
b/.github/workflows/flink_cdc_ci.yml
index 66d2f71be..f7514f7e0 100644
--- a/.github/workflows/flink_cdc_ci.yml
+++ b/.github/workflows/flink_cdc_ci.yml
@@ -108,7 +108,7 @@ jobs:
java-versions: "[11]"
flink-versions: "['2.2.0']"
custom-maven-parameter: "-Pflink2"
- modules: "['pipeline_e2e_2.x']"
+ modules: "['pipeline_e2e']"
parallelism: ${{ matrix.parallelism }}
source_e2e:
name: Source E2E Tests
diff --git a/.github/workflows/modules.py b/.github/workflows/modules.py
index d06c36d02..86e94164f 100755
--- a/.github/workflows/modules.py
+++ b/.github/workflows/modules.py
@@ -144,10 +144,6 @@ MODULES_PIPELINE_E2E = [
"flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests"
]
-MODULES_PIPELINE_E2E_2_X = [
- "flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x"
-]
-
MODULES_SOURCE_E2E = [
"flink-cdc-e2e-tests/flink-cdc-source-e2e-tests"
]
@@ -182,18 +178,13 @@ ALL_MODULES = set(
MODULES_SOURCE_E2E
)
-# Modules that require the flink2 Maven profile to be activated
-ALL_MODULES_FLINK2 = ALL_MODULES | set(MODULES_PIPELINE_E2E_2_X)
-
test_modules = set()
compile_modules = set()
for module in INPUT_MODULES.split(', '):
module_list = set(globals()['MODULES_' + module.upper().replace('-',
'_').replace('.', '_')])
test_modules |= module_list
- if module == 'pipeline_e2e_2.x':
- compile_modules |= ALL_MODULES_FLINK2
- elif module == 'source_e2e' or module == 'pipeline_e2e':
+ if module == 'source_e2e' or module == 'pipeline_e2e':
compile_modules |= ALL_MODULES
else:
compile_modules |= module_list
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/pom.xml
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/pom.xml
deleted file mode 100644
index 2d2a11259..000000000
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/pom.xml
+++ /dev/null
@@ -1,454 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>flink-cdc-e2e-tests</artifactId>
- <groupId>org.apache.flink</groupId>
- <version>${revision}</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>flink-cdc-pipeline-e2e-tests-2.x</artifactId>
-
- <properties>
- <flink.version>${flink.2.x.version}</flink.version>
- <mysql.driver.version>8.0.27</mysql.driver.version>
- <flink.release.download.skip>true</flink.release.download.skip>
- <skipITs>true</skipITs>
-
<flink.release.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.release.name>
-
<flink.release.mirror>https://dlcdn.apache.org/flink/flink-${flink.version}</flink.release.mirror>
- <maven.plugin.download.version>1.6.8</maven.plugin.download.version>
- <jmh.version>1.37</jmh.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-cdc-e2e-utils</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <!-- Drivers -->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <exclusions>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
- <version>${mysql.driver.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- CDC connectors test utils - 2.x versions -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-cdc-dist</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-cdc-pipeline-connector-values</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-test-util</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-cdc-pipeline-udf-examples</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- testcontainers -->
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>testcontainers</artifactId>
- <version>${testcontainers.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>junit-jupiter</artifactId>
- <version>${testcontainers.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>mysql</artifactId>
- <version>${testcontainers.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- benchmark -->
- <dependency>
- <groupId>org.openjdk.jmh</groupId>
- <artifactId>jmh-core</artifactId>
- <version>${jmh.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.openjdk.jmh</groupId>
- <artifactId>jmh-generator-annprocess</artifactId>
- <version>${jmh.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- test dependencies on TestContainers -->
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>postgresql</artifactId>
- <version>${testcontainers.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- This is for testing Scala UDF.-->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>kafka</artifactId>
- <version>${testcontainers.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- mini yarn -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <!-- This dependency is no longer shipped with the JDK
since Java 9.-->
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>ch.qos.reload4j</groupId>
- <artifactId>reload4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-reload4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-client</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <artifactId>jdk.tools</artifactId>
- <groupId>jdk.tools</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j</artifactId>
- <groupId>log4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-tests</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- <exclusions>
- <exclusion>
- <artifactId>jdk.tools</artifactId>
- <groupId>jdk.tools</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j</artifactId>
- <groupId>log4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- <exclusions>
- <exclusion>
- <artifactId>jdk.tools</artifactId>
- <groupId>jdk.tools</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j</artifactId>
- <groupId>log4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-yarn</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <testResources>
- <testResource>
- <directory>src/test/resources</directory>
- <excludes>
- <exclude>**/flink-cdc.sh</exclude>
- <exclude>**/flink-cdc.yaml</exclude>
- </excludes>
- </testResource>
- </testResources>
- <plugins>
- <!--
- Skip tests in the test phase.
- Tests will run in the integration-test phase after shading is
complete.
- -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
-
- <!--
- Configure failsafe to run integration tests AFTER the package
phase.
- This ensures tests use the shaded artifacts where Guava and
Flink APIs
- have been relocated.
- -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <version>3.0.0-M5</version>
- <executions>
- <execution>
- <id>end-to-end-tests</id>
- <goals>
- <goal>integration-test</goal>
- <goal>verify</goal>
- </goals>
- <configuration>
- <includes>
- <include>**/*.*</include>
- </includes>
- <excludes>
-
<exclude>**/MysqlE2eWithYarnApplicationITCase.java</exclude>
- </excludes>
- <forkCount>1</forkCount>
- <systemPropertyVariables>
- <moduleDir>${project.basedir}</moduleDir>
- </systemPropertyVariables>
- </configuration>
- </execution>
- <execution>
- <id>run-last-test</id>
- <goals>
- <goal>integration-test</goal>
- <goal>verify</goal>
- </goals>
- <configuration>
- <includes>
-
<include>**/MysqlE2eWithYarnApplicationITCase.java</include>
- </includes>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>com.googlecode.maven-download-plugin</groupId>
- <artifactId>download-maven-plugin</artifactId>
- <version>1.6.8</version>
- <configuration>
-
<cacheDirectory>${maven.plugin.download.cache.path}</cacheDirectory>
-
<outputDirectory>${project.build.directory}</outputDirectory>
- <readTimeOut>60000</readTimeOut>
- <retries>3</retries>
- <unpack>true</unpack>
- </configuration>
- <executions>
- <execution>
- <id>download-flink-release</id>
- <goals>
- <goal>wget</goal>
- </goals>
- <phase>test</phase>
- <configuration>
- <skip>${flink.release.download.skip}</skip>
-
<url>${flink.release.mirror}/${flink.release.name}</url>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-jars</id>
- <phase>test</phase>
- <goals>
- <goal>copy</goal>
- </goals>
- </execution>
- <execution>
- <id>store-classpath-in-target-for-tests</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
-
<outputFile>${project.build.directory}/yarn.classpath</outputFile>
- <excludeGroupIds>org.apache.flink</excludeGroupIds>
- </configuration>
- </execution>
- </executions>
- <configuration>
- <artifactItems>
-
- <artifactItem>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>${mysql.driver.version}</version>
- <destFileName>mysql-driver.jar</destFileName>
- <type>jar</type>
-
<outputDirectory>${project.build.directory}/dependencies
- </outputDirectory>
- </artifactItem>
-
- <artifactItem>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-cdc-dist</artifactId>
- <version>${project.version}</version>
- <destFileName>flink-cdc-dist.jar</destFileName>
- <type>jar</type>
-
<outputDirectory>${project.build.directory}/dependencies
- </outputDirectory>
- </artifactItem>
-
- <artifactItem>
- <groupId>org.apache.flink</groupId>
-
<artifactId>flink-cdc-pipeline-connector-values</artifactId>
- <version>${project.version}</version>
-
<destFileName>values-cdc-pipeline-connector.jar</destFileName>
- <type>jar</type>
-
<outputDirectory>${project.build.directory}/dependencies
- </outputDirectory>
- </artifactItem>
-
- <artifactItem>
- <groupId>org.apache.flink</groupId>
-
<artifactId>flink-cdc-pipeline-udf-examples</artifactId>
- <version>${project.version}</version>
- <destFileName>udf-examples.jar</destFileName>
- <type>jar</type>
-
<outputDirectory>${project.build.directory}/dependencies
- </outputDirectory>
- </artifactItem>
-
- <artifactItem>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- <destFileName>scala-library.jar</destFileName>
- <type>jar</type>
-
<outputDirectory>${project.build.directory}/dependencies
- </outputDirectory>
- </artifactItem>
- </artifactItems>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <profiles>
- <profile>
- <id>flink2</id>
- <properties>
-
<flink.release.download.skip>false</flink.release.download.skip>
- <skipITs>false</skipITs>
- </properties>
- </profile>
- </profiles>
-</project>
\ No newline at end of file
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
deleted file mode 100644
index b1d57c778..000000000
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.pipeline.tests.utils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.cdc.common.test.utils.TestUtils;
-import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
-import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
-import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.util.TestLogger;
-
-import com.github.dockerjava.api.DockerClient;
-import com.github.dockerjava.api.command.ExecCreateCmdResponse;
-import com.github.dockerjava.api.model.Volume;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.DockerClientFactory;
-import org.testcontainers.containers.BindMode;
-import org.testcontainers.containers.Container.ExecResult;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.FrameConsumerResultCallback;
-import org.testcontainers.containers.output.OutputFrame;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.output.ToStringConsumer;
-import org.testcontainers.images.builder.Transferable;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
-
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
-import java.util.stream.Stream;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/** Test environment running pipeline job on Flink containers. */
-@Testcontainers
-public abstract class PipelineTestEnvironment extends TestLogger {
-
- private static final Logger LOG =
LoggerFactory.getLogger(PipelineTestEnvironment.class);
-
- protected Integer parallelism = getParallelism();
-
- private int getParallelism() {
- try {
- return
Integer.parseInt(System.getProperty("specifiedParallelism"));
- } catch (NumberFormatException ex) {
- LOG.warn(
- "Unable to parse specified parallelism configuration ({}
provided). Use 4 by default.",
- System.getProperty("specifiedParallelism"));
- return 4;
- }
- }
-
- //
------------------------------------------------------------------------------------------
- // MySQL Variables (we always use MySQL as the data source for easier
verifying)
- //
------------------------------------------------------------------------------------------
- protected static final String MYSQL_TEST_USER = "mysqluser";
- protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
- protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
- protected static final Duration EVENT_WAITING_TIMEOUT =
Duration.ofMinutes(3);
- protected static final Duration STARTUP_WAITING_TIMEOUT =
Duration.ofMinutes(5);
-
- public static final Network NETWORK = Network.newNetwork();
-
- @Container
- protected static final MySqlContainer MYSQL =
- (MySqlContainer)
- new MySqlContainer(MySqlVersion.V8_0)
- .withConfigurationOverride("docker/mysql/my.cnf")
- .withSetupSQL("docker/mysql/setup.sql")
- .withDatabaseName("flink-test")
- .withUsername("flinkuser")
- .withPassword("flinkpw")
- .withNetwork(NETWORK)
- .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
- .withLogConsumer(new Slf4jLogConsumer(LOG));
-
- //
------------------------------------------------------------------------------------------
- // Flink Variables
- //
------------------------------------------------------------------------------------------
- protected static final int JOB_MANAGER_REST_PORT = 8081;
- protected static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
- protected static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
- protected static final List<String> EXTERNAL_PROPS =
- Arrays.asList(
- String.format("jobmanager.rpc.address: %s",
INTER_CONTAINER_JM_ALIAS),
- "jobmanager.bind-host: 0.0.0.0",
- "taskmanager.bind-host: 0.0.0.0",
- "rest.bind-address: 0.0.0.0",
- "rest.address: 0.0.0.0",
- "jobmanager.memory.process.size: 1GB",
- "query.server.port: 6125",
- "blob.server.port: 6124",
- "taskmanager.numberOfTaskSlots: 10",
- "parallelism.default: 4",
- "execution.checkpointing.interval: 300",
- "state.backend.type: hashmap",
- "env.java.default-opts.all:
--add-exports=java.base/sun.net.util=ALL-UNNAMED
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
--add-exports=java.sec [...]
- "execution.checkpointing.savepoint-dir: file:///opt/flink",
- "restart-strategy.type: off",
- "pekko.ask.timeout: 60s",
- // Set off-heap memory explicitly to avoid
"java.lang.OutOfMemoryError: Direct
- // buffer memory" error.
- "taskmanager.memory.task.off-heap.size: 128mb",
- // Fix `java.lang.OutOfMemoryError: Metaspace. The
metaspace out-of-memory error
- // has occurred` error.
- "taskmanager.memory.jvm-metaspace.size: 512mb");
- public static final String FLINK_PROPERTIES = String.join("\n",
EXTERNAL_PROPS);
-
- @Nullable protected RestClusterClient<StandaloneClusterId>
restClusterClient;
-
- protected GenericContainer<?> jobManager;
- protected GenericContainer<?> taskManager;
- protected Volume sharedVolume = new Volume("/tmp/shared");
-
- protected ToStringConsumer jobManagerConsumer;
-
- protected ToStringConsumer taskManagerConsumer;
-
- protected String flinkVersion = getFlinkVersion();
-
- public static String getFlinkVersion() {
- return "2.2.0";
- }
-
- protected List<String> copyJarToFlinkLib() {
- return Collections.emptyList();
- }
-
- @BeforeEach
- public void before() throws Exception {
- LOG.info("Starting containers...");
- jobManagerConsumer = new ToStringConsumer();
- jobManager =
- new GenericContainer<>(getFlinkDockerImageTag())
- .withCommand("jobmanager")
- .withNetwork(NETWORK)
- .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
- .withExposedPorts(JOB_MANAGER_REST_PORT)
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
- .withCreateContainerCmdModifier(cmd ->
cmd.withVolumes(sharedVolume))
- .withLogConsumer(jobManagerConsumer);
-
- List<String> jarToCopy = copyJarToFlinkLib();
- if (!jarToCopy.isEmpty()) {
- for (String jar : jarToCopy) {
- jobManager.withCopyFileToContainer(
- MountableFile.forHostPath(TestUtils.getResource(jar)),
"/opt/flink/lib/");
- }
- }
-
- Startables.deepStart(Stream.of(jobManager)).join();
- runInContainerAsRoot(jobManager, "chmod", "0777", "-R",
sharedVolume.toString());
- LOG.info("JobManager is started.");
-
- taskManagerConsumer = new ToStringConsumer();
- taskManager =
- new GenericContainer<>(getFlinkDockerImageTag())
- .withCommand("taskmanager")
- .withNetwork(NETWORK)
- .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
- .dependsOn(jobManager)
- .withVolumesFrom(jobManager, BindMode.READ_WRITE)
- .withLogConsumer(taskManagerConsumer);
- Startables.deepStart(Stream.of(taskManager)).join();
- runInContainerAsRoot(taskManager, "chmod", "0777", "-R",
sharedVolume.toString());
- LOG.info("TaskManager is started.");
-
- TarballFetcher.fetchLatest(jobManager);
- LOG.info("CDC executables deployed.");
- }
-
- @AfterEach
- public void after() {
- if (restClusterClient != null) {
- restClusterClient.close();
- }
- if (jobManager != null) {
- jobManager.stop();
- }
- if (taskManager != null) {
- taskManager.stop();
- }
- }
-
- /**
- * Submits a YAML job to the running cluster with latest CDC version,
without from previous
- * savepoint states.
- */
- public JobID submitPipelineJob(String pipelineJob, Path... jars) throws
Exception {
- return submitPipelineJob(
- TarballFetcher.CdcVersion.SNAPSHOT, pipelineJob, null, false,
jars);
- }
-
- /**
- * Submits a YAML job to the running cluster with specific CDC version,
without from previous
- * savepoint states.
- */
- public JobID submitPipelineJob(
- TarballFetcher.CdcVersion version, String pipelineJob, Path...
jars) throws Exception {
- return submitPipelineJob(version, pipelineJob, null, false, jars);
- }
-
- /** Submits a YAML job to the running cluster with latest CDC version. */
- public JobID submitPipelineJob(
- String pipelineJob,
- @Nullable String savepointPath,
- boolean allowNonRestoredState,
- Path... jars)
- throws Exception {
- return submitPipelineJob(
- TarballFetcher.CdcVersion.SNAPSHOT,
- pipelineJob,
- savepointPath,
- allowNonRestoredState,
- jars);
- }
-
- public JobID submitPipelineJob(
- TarballFetcher.CdcVersion version,
- String pipelineJob,
- @Nullable String savepointPath,
- boolean allowNonRestoredState,
- Path... jars)
- throws Exception {
-
- // Prepare external JAR dependencies
- List<Path> paths = new ArrayList<>(Arrays.asList(jars));
- List<String> containerPaths = new ArrayList<>();
- paths.add(TestUtils.getResource("mysql-driver.jar"));
-
- for (Path jar : paths) {
- String containerPath = version.workDir() + "/lib/" +
jar.getFileName();
- jobManager.copyFileToContainer(MountableFile.forHostPath(jar),
containerPath);
- containerPaths.add(containerPath);
- }
-
- containerPaths.add(version.workDir() +
"/lib/values-cdc-pipeline-connector.jar");
-
- StringBuilder sb = new StringBuilder();
- for (String containerPath : containerPaths) {
- sb.append(" --jar ").append(containerPath);
- }
-
- jobManager.copyFileToContainer(
- Transferable.of(pipelineJob), version.workDir() +
"/conf/pipeline.yaml");
-
- String commands =
- version.workDir()
- + "/bin/flink-cdc.sh "
- + version.workDir()
- + "/conf/pipeline.yaml --flink-home /opt/flink"
- + sb;
-
- if (savepointPath != null) {
- commands += " --from-savepoint " + savepointPath;
- if (allowNonRestoredState) {
- commands += " --allow-nonRestored-state";
- }
- }
- LOG.info("Execute command: {}", commands);
- ExecResult execResult = executeAndCheck(jobManager, commands);
- return Arrays.stream(execResult.getStdout().split("\n"))
- .filter(line -> line.startsWith("Job ID: "))
- .findFirst()
- .map(line -> line.split(": ")[1])
- .map(JobID::fromHexString)
- .orElse(null);
- }
-
- public String stopJobWithSavepoint(JobID jobID) {
- String savepointPath = "/opt/flink/";
- ExecResult result =
- executeAndCheck(
- jobManager,
- "flink",
- "stop",
- jobID.toHexString(),
- "--savepointPath",
- savepointPath);
-
- return Arrays.stream(result.getStdout().split("\n"))
- .filter(line -> line.startsWith("Savepoint completed."))
- .findFirst()
- .map(line -> line.split("Path: file:")[1])
- .orElseThrow(
- () -> new RuntimeException("Failed to parse savepoint
path from stdout."));
- }
-
- public void cancelJob(JobID jobID) {
- executeAndCheck(jobManager, "flink", "cancel", jobID.toHexString());
- }
-
- /**
- * Get {@link RestClusterClient} connected to this FlinkContainer.
- *
- * <p>This method lazily initializes the REST client on-demand.
- */
- public RestClusterClient<StandaloneClusterId> getRestClusterClient() {
- if (restClusterClient != null) {
- return restClusterClient;
- }
- checkState(
- jobManager.isRunning(),
- "Cluster client should only be retrieved for a running
cluster");
- try {
- final Configuration clientConfiguration = new Configuration();
- clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost());
- clientConfiguration.set(
- RestOptions.PORT,
jobManager.getMappedPort(JOB_MANAGER_REST_PORT));
- this.restClusterClient =
- new RestClusterClient<>(clientConfiguration,
StandaloneClusterId.getInstance());
- } catch (Exception e) {
- throw new IllegalStateException(
- "Failed to create client for Flink container cluster", e);
- }
- return restClusterClient;
- }
-
- public void waitUntilJobRunning(Duration timeout) {
- waitUntilJobState(timeout, JobStatus.RUNNING);
- }
-
- public void waitUntilJobFinished(Duration timeout) {
- waitUntilJobState(timeout, JobStatus.FINISHED);
- }
-
- public void waitUntilJobState(Duration timeout, JobStatus expectedStatus) {
- RestClusterClient<?> clusterClient = getRestClusterClient();
- Deadline deadline = Deadline.fromNow(timeout);
- while (deadline.hasTimeLeft()) {
- Collection<JobStatusMessage> jobStatusMessages;
- try {
- jobStatusMessages = clusterClient.listJobs().get(10,
TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.warn("Error when fetching job status.", e);
- continue;
- }
- if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) {
- JobStatusMessage message = jobStatusMessages.iterator().next();
- JobStatus jobStatus = message.getJobState();
- if (!expectedStatus.isTerminalState() &&
jobStatus.isTerminalState()) {
- throw new ValidationException(
- String.format(
- "Job has been terminated! JobName: %s,
JobID: %s, Status: %s",
- message.getJobName(),
- message.getJobId(),
- message.getJobState()));
- } else if (jobStatus == expectedStatus) {
- return;
- }
- }
- }
- }
-
- protected String getFlinkDockerImageTag() {
- if (System.getProperty("java.specification.version").equals("17")) {
- return String.format("flink:%s-scala_2.12-java17", flinkVersion);
- }
- return String.format("flink:%s-scala_2.12-java11", flinkVersion);
- }
-
- private ExecResult executeAndCheck(GenericContainer<?> container,
String... command) {
- String joinedCommand = String.join(" ", command);
- try {
- LOG.info("Executing command {}", joinedCommand);
- ExecResult execResult =
- container.execInContainer("bash", "-c", String.join(" ",
command));
- LOG.info(execResult.getStdout());
- if (execResult.getExitCode() == 0) {
- LOG.info("Command executed successfully.");
- return execResult;
- } else {
- LOG.error(execResult.getStderr());
- throw new AssertionError(
- "Failed when submitting the pipeline job.\n"
- + "Exit code: "
- + execResult.getExitCode()
- + "\n"
- + "StdOut: "
- + execResult.getStdout()
- + "\n"
- + "StdErr: "
- + execResult.getStderr());
- }
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to execute command " + joinedCommand + " in
container " + container);
- }
- }
-
- public void runInContainerAsRoot(GenericContainer<?> container, String...
command)
- throws InterruptedException {
- ToStringConsumer stdoutConsumer = new ToStringConsumer();
- ToStringConsumer stderrConsumer = new ToStringConsumer();
- DockerClient dockerClient = DockerClientFactory.instance().client();
- ExecCreateCmdResponse execCreateCmdResponse =
- dockerClient
- .execCreateCmd(container.getContainerId())
- .withUser("root")
- .withCmd(command)
- .exec();
- FrameConsumerResultCallback callback = new
FrameConsumerResultCallback();
- callback.addConsumer(OutputFrame.OutputType.STDOUT, stdoutConsumer);
- callback.addConsumer(OutputFrame.OutputType.STDERR, stderrConsumer);
-
dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(callback).awaitCompletion();
- }
-
- protected List<String> readLines(String resource) throws IOException {
- final URL url =
PipelineTestEnvironment.class.getClassLoader().getResource(resource);
- assert url != null;
- Path path = new File(url.getFile()).toPath();
- return Files.readAllLines(path);
- }
-
- protected void validateResult(String... expectedEvents) throws Exception {
- validateResult(Function.identity(), expectedEvents);
- }
-
- protected void validateResult(Function<String, String> mapper, String...
expectedEvents)
- throws Exception {
- validateResult(
- taskManagerConsumer,
Stream.of(expectedEvents).map(mapper).toArray(String[]::new));
- }
-
- protected void validateResult(ToStringConsumer consumer, String...
expectedEvents)
- throws Exception {
- for (String event : expectedEvents) {
- waitUntilSpecificEvent(consumer, event);
- }
- }
-
- protected void validateResult(
- ToStringConsumer consumer, Function<String, String> mapper,
String... expectedEvents)
- throws Exception {
- validateResult(consumer,
Stream.of(expectedEvents).map(mapper).toArray(String[]::new));
- }
-
- protected void waitUntilSpecificEvent(String event) throws Exception {
- waitUntilSpecificEvent(taskManagerConsumer, event);
- }
-
- protected void waitUntilSpecificEvent(ToStringConsumer consumer, String
event)
- throws Exception {
- boolean result = false;
- long endTimeout = System.currentTimeMillis() +
EVENT_WAITING_TIMEOUT.toMillis();
- while (System.currentTimeMillis() < endTimeout) {
- String stdout = consumer.toUtf8String();
- if (stdout.contains(event + "\n")) {
- result = true;
- break;
- }
- Thread.sleep(1000);
- }
- if (!result) {
- throw new TimeoutException(
- "failed to get specific event: "
- + event
- + " from stdout: "
- + consumer.toUtf8String());
- }
- }
-}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
deleted file mode 100644
index 8bd16d805..000000000
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.pipeline.tests.utils;
-
-import org.apache.flink.cdc.common.test.utils.TestUtils;
-
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.utility.MountableFile;
-
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-
-/** Obtain and downloads corresponding Flink CDC tarball files. */
-public abstract class TarballFetcher {
-
- private static final Logger LOG =
LoggerFactory.getLogger(TarballFetcher.class);
-
- public static void fetchAll(GenericContainer<?> container) throws
Exception {
- fetch(container, CdcVersion.values());
- }
-
- public static void fetchLatest(GenericContainer<?> container) throws
Exception {
- fetch(container, CdcVersion.SNAPSHOT);
- }
-
- public static void fetch(GenericContainer<?> container, CdcVersion...
versions)
- throws Exception {
- for (CdcVersion version : versions) {
- TarballFetcher.fetchInternal(container, version);
- }
- }
-
- private static void fetchInternal(GenericContainer<?> container,
CdcVersion version)
- throws Exception {
- LOG.info("Trying to download CDC tarball @ {}...", version);
- if (CdcVersion.SNAPSHOT.equals(version)) {
- LOG.info("CDC {} is a snapshot version, we should fetch it
locally...", version);
-
- container.copyFileToContainer(
- MountableFile.forHostPath(
- TestUtils.getResource("flink-cdc.sh",
"flink-cdc-dist", "src"), 0755),
- version.workDir() + "/bin/flink-cdc.sh");
- container.copyFileToContainer(
- MountableFile.forHostPath(
- TestUtils.getResource("flink-cdc.yaml",
"flink-cdc-dist", "src"), 0755),
- version.workDir() + "/conf/flink-cdc.yaml");
- container.copyFileToContainer(
-
MountableFile.forHostPath(TestUtils.getResource("flink-cdc-dist.jar")),
- version.workDir() + "/lib/flink-cdc-dist.jar");
- container.copyFileToContainer(
- MountableFile.forHostPath(
-
TestUtils.getResource("values-cdc-pipeline-connector.jar")),
- version.workDir() +
"/lib/values-cdc-pipeline-connector.jar");
-
- } else {
- LOG.info("CDC {} is a released version, download it from the
Internet...", version);
-
- String containerPath = "/tmp/tarball/" + version.getVersion() +
".tar.gz";
- downloadAndCopyToContainer(container, version.tarballUrl(),
containerPath);
- container.execInContainer("mkdir", "-p", version.workDir());
- container.execInContainer(
- "tar", "-xzvf", containerPath, "-C", version.workDir(),
"--strip-components=1");
-
- downloadAndCopyToContainer(
- container,
- version.connectorJarUrl("values"),
- version.workDir() +
"/lib/values-cdc-pipeline-connector.jar");
- }
- }
-
- private static void downloadAndCopyToContainer(
- GenericContainer<?> container, String url, String containerPath)
throws Exception {
- Path tempFile = Files.createTempFile("download-", ".tmp");
- FileUtils.copyURLToFile(
- new URL(url),
- tempFile.toFile(),
- (int) Duration.ofMinutes(1).toMillis(),
- (int) Duration.ofMinutes(5).toMillis());
- container.copyFileToContainer(MountableFile.forHostPath(tempFile),
containerPath);
- }
-
- /** Enum for all released Flink CDC version tags. */
- public enum CdcVersion {
- V3_1_1("3.1.1"),
- V3_2_0("3.2.0"),
- V3_2_1("3.2.1"),
- V3_3_0("3.3.0"),
- V3_4_0("3.4.0"),
- V3_5_0("3.5.0"),
- SNAPSHOT("SNAPSHOT");
-
- private final String version;
-
- CdcVersion(String version) {
- this.version = version;
- }
-
- public String getVersion() {
- return version;
- }
-
- public static List<CdcVersion> getAllVersions() {
- return Arrays.asList(CdcVersion.values());
- }
-
- public static List<CdcVersion> getVersionsSince(CdcVersion version) {
- return getAllVersions()
- .subList(getAllVersions().indexOf(version),
getAllVersions().size());
- }
-
- public String tarballUrl() {
- return "https://dlcdn.apache.org/flink/flink-cdc-"
- + version
- + "/flink-cdc-"
- + version
- + "-bin.tar.gz";
- }
-
- public String workDir() {
- return "/tmp/cdc/" + version;
- }
-
- public String connectorJarUrl(String name) {
- return String.format(
-
"https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-%s/%s/flink-cdc-pipeline-connector-%s-%s.jar",
- name, version, name, version);
- }
- }
-}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/my.cnf
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/my.cnf
deleted file mode 100644
index 26d1abad6..000000000
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/my.cnf
+++ /dev/null
@@ -1,64 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# For advice on how to change settings please see
-# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
-
-[mysqld]
-#
-# Remove leading # and set to the amount of RAM for the most important data
-# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
-# innodb_buffer_pool_size = 128M
-#
-# Remove leading # to turn on a very important data integrity option: logging
-# changes to the binary log between backups.
-# log_bin
-#
-# Remove leading # to set options mainly useful for reporting servers.
-# The server defaults are faster for transactions and fast SELECTs.
-# Adjust sizes as needed, experiment to find the optimal values.
-# join_buffer_size = 128M
-# sort_buffer_size = 2M
-# read_rnd_buffer_size = 2M
-skip-host-cache
-skip-name-resolve
-#datadir=/var/lib/mysql
-#socket=/var/lib/mysql/mysql.sock
-#secure-file-priv=/var/lib/mysql-files
-secure-file-priv=/var/lib/mysql
-user=mysql
-
-# Disabling symbolic-links is recommended to prevent assorted security risks
-symbolic-links=0
-
-#log-error=/var/log/mysqld.log
-#pid-file=/var/run/mysqld/mysqld.pid
-
-# ----------------------------------------------
-# Enable the binlog for replication & CDC
-# ----------------------------------------------
-
-# Enable binary replication log and set the prefix, expiration, and log format.
-# The prefix is arbitrary, expiration can be short for integration tests but
would
-# be longer on a production system. Row-level info is required for ingest to
work.
-# Server ID is required, but this will vary on production systems
-server-id = 223344
-log_bin = mysql-bin
-expire_logs_days = 1
-binlog_format = row
-
-# enable gtid mode
-gtid_mode = on
-enforce_gtid_consistency = on
\ No newline at end of file
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/setup.sql
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/setup.sql
deleted file mode 100644
index 3c0cccb4b..000000000
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/docker/mysql/setup.sql
+++ /dev/null
@@ -1,30 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one or more
--- contributor license agreements. See the NOTICE file distributed with
--- this work for additional information regarding copyright ownership.
--- The ASF licenses this file to You under the Apache License, Version 2.0
--- (the "License"); you may not use this file except in compliance with
--- the License. You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
--- In production you would almost certainly limit the replication user must be
on the follower (slave) machine,
--- to prevent other clients accessing the log from other machines. For
example, 'replicator'@'follower.acme.com'.
--- However, in this database we'll grant 2 users different privileges:
---
--- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog
reader (used for testing)
--- 2) 'mysqluser' - all privileges
---
-GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,
LOCK TABLES ON *.* TO 'flinkuser'@'%';
-CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
-GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
-
---
----------------------------------------------------------------------------------------------------------------
--- DATABASE: emptydb
---
----------------------------------------------------------------------------------------------------------------
-CREATE DATABASE emptydb;
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/log4j2-test.properties
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/log4j2-test.properties
deleted file mode 100644
index 32df1c025..000000000
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/resources/log4j2-test.properties
+++ /dev/null
@@ -1,25 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to ERROR to not flood build logs
-# set manually to INFO for debugging purposes
-rootLogger.level=ERROR
-rootLogger.appenderRef.test.ref = TestLogger
-
-appender.testlogger.name = TestLogger
-appender.testlogger.type = CONSOLE
-appender.testlogger.target = SYSTEM_ERR
-appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 8c263f8de..897d1acc3 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -805,4 +805,43 @@ limitations under the License.
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>flink2</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>default-test</id>
+ <phase>none</phase>
+ </execution>
+ <execution>
+ <id>integration-tests</id>
+ <phase>none</phase>
+ </execution>
+ <execution>
+ <id>end-to-end-tests</id>
+ <configuration>
+ <includes>
+
<include>**/ValuesE2eITCase.java</include>
+ </includes>
+ <excludes>
+
<exclude>**/MysqlE2eWithYarnApplicationITCase.java</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ <execution>
+ <id>run-last-test</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java
similarity index 97%
rename from
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java
rename to
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java
index 7d863333d..0f7bc28cb 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java
@@ -47,7 +47,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment {
submitPipelineJob(pipelineJob);
waitUntilJobFinished(Duration.ofSeconds(30));
- LOG.info("Pipeline job is running");
+ LOG.info("Pipeline job has finished");
validateResult(
"CreateTableEvent{tableId=default_namespace.default_schema.table1,
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
@@ -79,7 +79,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment {
submitPipelineJob(pipelineJob);
waitUntilJobFinished(Duration.ofSeconds(30));
- LOG.info("Pipeline job is running");
+ LOG.info("Pipeline job has finished");
validateResult(
"CreateTableEvent{tableId=default_namespace.default_schema.table1,
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
@@ -115,7 +115,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment {
submitPipelineJob(pipelineJob);
waitUntilJobFinished(Duration.ofSeconds(30));
- LOG.info("Pipeline job is running");
+ LOG.info("Pipeline job has finished");
validateResult(
"CreateTableEvent{tableId=default_namespace.default_schema.table1,
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
@@ -151,7 +151,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment {
submitPipelineJob(pipelineJob);
waitUntilJobFinished(Duration.ofSeconds(30));
- LOG.info("Pipeline job is running");
+ LOG.info("Pipeline job has finished");
validateResult(
"CreateTableEvent{tableId=default_namespace.default_schema.table1,
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
@@ -186,7 +186,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment {
submitPipelineJob(pipelineJob);
waitUntilJobFinished(Duration.ofSeconds(30));
- LOG.info("Pipeline job is running");
+ LOG.info("Pipeline job has finished");
validateResult(
"CreateTableEvent{tableId=default_namespace.default_schema.table1,
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
@@ -214,7 +214,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment {
submitPipelineJob(pipelineJob);
waitUntilJobFinished(Duration.ofSeconds(30));
- LOG.info("Pipeline job is running");
+ LOG.info("Pipeline job has finished");
validateResult(
"CreateTableEvent{tableId=default_namespace.default_schema.table1,
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
@@ -246,7 +246,7 @@ class ValuesE2eITCase extends PipelineTestEnvironment {
submitPipelineJob(pipelineJob);
waitUntilJobFinished(Duration.ofSeconds(30));
- LOG.info("Pipeline job is running");
+ LOG.info("Pipeline job has finished");
validateResult(
"CreateTableEvent{tableId=default_namespace.default_schema.table1,
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
index 1d5788918..450f683d6 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
@@ -19,10 +19,16 @@ package org.apache.flink.cdc.pipeline.tests.utils;
import org.apache.flink.cdc.common.test.utils.TestUtils;
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.command.ExecCreateCmdResponse;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.FrameConsumerResultCallback;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.ToStringConsumer;
import org.testcontainers.utility.MountableFile;
import java.net.URL;
@@ -60,11 +66,11 @@ public abstract class TarballFetcher {
container.copyFileToContainer(
MountableFile.forHostPath(
- TestUtils.getResource("flink-cdc.sh",
"flink-cdc-dist", "src"), 755),
+ TestUtils.getResource("flink-cdc.sh",
"flink-cdc-dist", "src")),
version.workDir() + "/bin/flink-cdc.sh");
container.copyFileToContainer(
MountableFile.forHostPath(
- TestUtils.getResource("flink-cdc.yaml",
"flink-cdc-dist", "src"), 755),
+ TestUtils.getResource("flink-cdc.yaml",
"flink-cdc-dist", "src")),
version.workDir() + "/conf/flink-cdc.yaml");
container.copyFileToContainer(
MountableFile.forHostPath(TestUtils.getResource("flink-cdc-dist.jar")),
@@ -78,6 +84,9 @@ public abstract class TarballFetcher {
TestUtils.getResource("values-cdc-pipeline-connector.jar")),
version.workDir() +
"/lib/values-cdc-pipeline-connector.jar");
+ // Ensure the script has execute permission
+ runInContainerAsRoot(container, "chmod", "+x", version.workDir() +
"/bin/flink-cdc.sh");
+
} else {
LOG.info("CDC {} is a released version, download it from the
Internet...", version);
@@ -109,6 +118,23 @@ public abstract class TarballFetcher {
container.copyFileToContainer(MountableFile.forHostPath(tempFile),
containerPath);
}
+ private static void runInContainerAsRoot(GenericContainer<?> container,
String... command)
+ throws InterruptedException {
+ ToStringConsumer stdoutConsumer = new ToStringConsumer();
+ ToStringConsumer stderrConsumer = new ToStringConsumer();
+ DockerClient dockerClient = DockerClientFactory.instance().client();
+ ExecCreateCmdResponse execCreateCmdResponse =
+ dockerClient
+ .execCreateCmd(container.getContainerId())
+ .withUser("root")
+ .withCmd(command)
+ .exec();
+ FrameConsumerResultCallback callback = new
FrameConsumerResultCallback();
+ callback.addConsumer(OutputFrame.OutputType.STDOUT, stdoutConsumer);
+ callback.addConsumer(OutputFrame.OutputType.STDERR, stderrConsumer);
+
dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(callback).awaitCompletion();
+ }
+
/** Enum for all released Flink CDC version tags. */
public enum CdcVersion {
V3_1_1("3.1.1"),
diff --git a/flink-cdc-e2e-tests/pom.xml b/flink-cdc-e2e-tests/pom.xml
index 155c6b735..28d16beee 100644
--- a/flink-cdc-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/pom.xml
@@ -33,7 +33,6 @@ limitations under the License.
<module>flink-cdc-e2e-utils</module>
<module>flink-cdc-source-e2e-tests</module>
<module>flink-cdc-pipeline-e2e-tests</module>
- <module>flink-cdc-pipeline-e2e-tests-2.x</module>
</modules>
<build>
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java
index 6ea1ceaee..e89f3f814 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java
@@ -180,24 +180,26 @@ public class BatchDataSinkWriterOperator<CommT>
}
private Object createFlinkWriterOperator() {
+ Class<?> flinkWriterClass;
try {
- Class<?> flinkWriterClass =
+ flinkWriterClass =
getRuntimeContext()
.getUserCodeClassLoader()
.loadClass(
"org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Failed to load SinkWriterOperator
class", e);
+ }
+
+ try {
Constructor<?> constructor =
flinkWriterClass.getDeclaredConstructor(
Sink.class, ProcessingTimeService.class,
MailboxExecutor.class);
constructor.setAccessible(true);
return constructor.newInstance(sink, processingTimeService,
mailboxExecutor);
- } catch (Exception ignore) {
+ } catch (NoSuchMethodException e) {
+ // Constructor with 3 parameters not found, try the 4-parameter
version
try {
- Class<?> flinkWriterClass =
- getRuntimeContext()
- .getUserCodeClassLoader()
- .loadClass(
-
"org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator");
Constructor<?> constructor =
flinkWriterClass.getDeclaredConstructor(
StreamOperatorParameters.class,
@@ -206,9 +208,12 @@ public class BatchDataSinkWriterOperator<CommT>
MailboxExecutor.class);
constructor.setAccessible(true);
return constructor.newInstance(null, sink,
processingTimeService, mailboxExecutor);
- } catch (Exception e) {
- throw new RuntimeException("Failed to create
SinkWriterOperator in Flink", e);
+ } catch (Exception ex) {
+ throw new RuntimeException("Failed to create
SinkWriterOperator in Flink", ex);
}
+ } catch (Exception e) {
+ // Other exceptions (e.g., InvocationTargetException) indicate
real failures
+ throw new RuntimeException("Failed to create SinkWriterOperator in
Flink", e);
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
index 0c45b57c2..bac2929ce 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
@@ -265,24 +265,26 @@ public class DataSinkWriterOperator<CommT>
}
private Object createFlinkWriterOperator() {
+ Class<?> flinkWriterClass;
try {
- Class<?> flinkWriterClass =
+ flinkWriterClass =
getRuntimeContext()
.getUserCodeClassLoader()
.loadClass(
"org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Failed to load SinkWriterOperator
class", e);
+ }
+
+ try {
Constructor<?> constructor =
flinkWriterClass.getDeclaredConstructor(
Sink.class, ProcessingTimeService.class,
MailboxExecutor.class);
constructor.setAccessible(true);
return constructor.newInstance(sink, processingTimeService,
mailboxExecutor);
- } catch (Exception ignore) {
+ } catch (NoSuchMethodException e) {
+ // Constructor with 3 parameters not found, try the 4-parameter
version
try {
- Class<?> flinkWriterClass =
- getRuntimeContext()
- .getUserCodeClassLoader()
- .loadClass(
-
"org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator");
Constructor<?> constructor =
flinkWriterClass.getDeclaredConstructor(
StreamOperatorParameters.class,
@@ -291,9 +293,12 @@ public class DataSinkWriterOperator<CommT>
MailboxExecutor.class);
constructor.setAccessible(true);
return constructor.newInstance(null, sink,
processingTimeService, mailboxExecutor);
- } catch (Exception e) {
- throw new RuntimeException("Failed to create
SinkWriterOperator in Flink", e);
+ } catch (Exception ex) {
+ throw new RuntimeException("Failed to create
SinkWriterOperator in Flink", ex);
}
+ } catch (Exception e) {
+ // Other exceptions (e.g., InvocationTargetException) indicate
real failures
+ throw new RuntimeException("Failed to create SinkWriterOperator in
Flink", e);
}
}