This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit d8a9c8c63ef5fb958a72c3a15e47ee29d72a481e
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Wed Jun 5 12:09:08 2024 +0800

    [FLINK-35316][base] Run E2e test cases with Flink 1.19 and latest patch 
versions
---
 .../flink-cdc-pipeline-e2e-tests/pom.xml           |  7 ++-
 .../flink/cdc/pipeline/tests/MysqlE2eITCase.java   | 24 ++++++----
 .../cdc/pipeline/tests/TransformE2eITCase.java     | 10 ++--
 .../tests/utils/PipelineTestEnvironment.java       | 55 +++++++++++++++------
 .../flink-cdc-source-e2e-tests/pom.xml             | 38 ++++++---------
 .../tests/utils/FlinkContainerTestEnvironment.java | 56 ++++++++++++++++------
 6 files changed, 120 insertions(+), 70 deletions(-)

diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index e785ba0a5..2326240b6 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -28,8 +28,9 @@ limitations under the License.
     <artifactId>flink-cdc-pipeline-e2e-tests</artifactId>
 
     <properties>
-        <flink-1.17>1.17.1</flink-1.17>
-        <flink-1.18>1.18.0</flink-1.18>
+        <flink-1.17>1.17.2</flink-1.17>
+        <flink-1.18>1.18.1</flink-1.18>
+        <flink-1.19>1.19.0</flink-1.19>
         <mysql.driver.version>8.0.27</mysql.driver.version>
         
<starrocks.connector.version>1.2.9_flink-${flink.major.version}</starrocks.connector.version>
     </properties>
@@ -88,12 +89,14 @@ limitations under the License.
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-cdc-pipeline-connector-doris</artifactId>
             <version>${project.version}</version>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-cdc-pipeline-connector-starrocks</artifactId>
             <version>${project.version}</version>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
         <dependency>
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index 330717586..28db063ef 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -114,12 +114,12 @@ public class MysqlE2eITCase extends 
PipelineTestEnvironment {
         submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, 
mysqlDriverJar);
         waitUntilJobRunning(Duration.ofSeconds(30));
         LOG.info("Pipeline job is running");
-        waitUtilSpecificEvent(
+        waitUntilSpecificEvent(
                 String.format(
                         "DataChangeEvent{tableId=%s.customers, before=[], 
after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
                         mysqlInventoryDatabase.getDatabaseName()),
                 60000L);
-        waitUtilSpecificEvent(
+        waitUntilSpecificEvent(
                 String.format(
                         "DataChangeEvent{tableId=%s.products, before=[], 
after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, 
meta=()}",
                         mysqlInventoryDatabase.getDatabaseName()),
@@ -186,6 +186,14 @@ public class MysqlE2eITCase extends 
PipelineTestEnvironment {
                 Statement stat = conn.createStatement()) {
             stat.execute("UPDATE products SET description='18oz carpenter 
hammer' WHERE id=106;");
             stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+
+            // Perform DDL changes after the binlog is generated
+            waitUntilSpecificEvent(
+                    String.format(
+                            "DataChangeEvent{tableId=%s.products, before=[106, 
hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 
18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
+                            mysqlInventoryDatabase.getDatabaseName()),
+                    20000L);
+
             // modify table schema
             stat.execute("ALTER TABLE products ADD COLUMN new_col INT;");
             stat.execute(
@@ -201,7 +209,7 @@ public class MysqlE2eITCase extends PipelineTestEnvironment 
{
             throw e;
         }
 
-        waitUtilSpecificEvent(
+        waitUntilSpecificEvent(
                 String.format(
                         "DataChangeEvent{tableId=%s.products, before=[111, 
scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, 
meta=()}",
                         mysqlInventoryDatabase.getDatabaseName()),
@@ -236,17 +244,13 @@ public class MysqlE2eITCase extends 
PipelineTestEnvironment {
         validateResult(expectedEvents);
     }
 
-    private void validateResult(List<String> expectedEvents) {
-        String stdout = taskManagerConsumer.toUtf8String();
+    private void validateResult(List<String> expectedEvents) throws Exception {
         for (String event : expectedEvents) {
-            if (!stdout.contains(event)) {
-                throw new RuntimeException(
-                        "failed to get specific event: " + event + " from 
stdout: " + stdout);
-            }
+            waitUntilSpecificEvent(event, 6000L);
         }
     }
 
-    private void waitUtilSpecificEvent(String event, long timeout) throws 
Exception {
+    private void waitUntilSpecificEvent(String event, long timeout) throws 
Exception {
         boolean result = false;
         long endTimeout = System.currentTimeMillis() + timeout;
         while (System.currentTimeMillis() < endTimeout) {
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index cd8fc0b45..2f896d334 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -132,13 +132,13 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                 String.format(
                         "DataChangeEvent{tableId=%s.terminus, before=[], 
after=[1011, 11], op=INSERT, meta=()}",
                         transformRenameDatabase.getDatabaseName()),
-                6000L);
+                60000L);
 
         waitUntilSpecificEvent(
                 String.format(
                         "DataChangeEvent{tableId=%s.terminus, before=[], 
after=[2014, 14], op=INSERT, meta=()}",
                         transformRenameDatabase.getDatabaseName()),
-                6000L);
+                60000L);
 
         List<String> expectedEvents =
                 Arrays.asList(
@@ -188,19 +188,19 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                 String.format(
                         "DataChangeEvent{tableId=%s.terminus, before=[], 
after=[3007, 7], op=INSERT, meta=()}",
                         transformRenameDatabase.getDatabaseName()),
-                6000L);
+                20000L);
 
         waitUntilSpecificEvent(
                 String.format(
                         "DataChangeEvent{tableId=%s.terminus, before=[1009, 
8.1], after=[1009, 100], op=UPDATE, meta=()}",
                         transformRenameDatabase.getDatabaseName()),
-                6000L);
+                20000L);
 
         waitUntilSpecificEvent(
                 String.format(
                         "DataChangeEvent{tableId=%s.terminus, before=[2011, 
11], after=[], op=DELETE, meta=()}",
                         transformRenameDatabase.getDatabaseName()),
-                6000L);
+                20000L);
 
         String stdout = taskManagerConsumer.toUtf8String();
         System.out.println(stdout);
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index a448bf554..65c0a202e 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.util.TestLogger;
 
+import com.fasterxml.jackson.core.Version;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -54,6 +55,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -71,17 +73,6 @@ public abstract class PipelineTestEnvironment extends 
TestLogger {
     public static final int JOB_MANAGER_REST_PORT = 8081;
     public static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
     public static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
-    public static final String FLINK_PROPERTIES =
-            String.join(
-                    "\n",
-                    Arrays.asList(
-                            "jobmanager.rpc.address: jobmanager",
-                            "taskmanager.numberOfTaskSlots: 10",
-                            "parallelism.default: 4",
-                            "execution.checkpointing.interval: 300",
-                            // this is needed for oracle-cdc tests.
-                            // see https://stackoverflow.com/a/47062742/4915129
-                            "env.java.opts: 
-Doracle.jdbc.timezoneAsRegion=false"));
 
     @ClassRule public static final Network NETWORK = Network.newNetwork();
 
@@ -97,13 +88,16 @@ public abstract class PipelineTestEnvironment extends 
TestLogger {
 
     @Parameterized.Parameters(name = "flinkVersion: {0}")
     public static List<String> getFlinkVersion() {
-        return Arrays.asList("1.17.1", "1.18.0");
+        return Arrays.asList("1.17.2", "1.18.1", "1.19.0");
     }
 
     @Before
     public void before() throws Exception {
         LOG.info("Starting containers...");
         jobManagerConsumer = new ToStringConsumer();
+
+        String flinkProperties = getFlinkProperties(flinkVersion);
+
         jobManager =
                 new GenericContainer<>(getFlinkDockerImageTag())
                         .withCommand("jobmanager")
@@ -111,7 +105,7 @@ public abstract class PipelineTestEnvironment extends 
TestLogger {
                         .withExtraHost("host.docker.internal", "host-gateway")
                         .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
                         .withExposedPorts(JOB_MANAGER_REST_PORT)
-                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .withEnv("FLINK_PROPERTIES", flinkProperties)
                         .withLogConsumer(jobManagerConsumer);
         taskManagerConsumer = new ToStringConsumer();
         taskManager =
@@ -120,7 +114,7 @@ public abstract class PipelineTestEnvironment extends 
TestLogger {
                         .withExtraHost("host.docker.internal", "host-gateway")
                         .withNetwork(NETWORK)
                         .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
-                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .withEnv("FLINK_PROPERTIES", flinkProperties)
                         .dependsOn(jobManager)
                         .withLogConsumer(taskManagerConsumer);
 
@@ -246,4 +240,37 @@ public abstract class PipelineTestEnvironment extends 
TestLogger {
     protected String getFlinkDockerImageTag() {
         return String.format("flink:%s-scala_2.12", flinkVersion);
     }
+
+    private static Version parseVersion(String version) {
+        List<Integer> versionParts =
+                Arrays.stream(version.split("\\."))
+                        .map(Integer::valueOf)
+                        .limit(3)
+                        .collect(Collectors.toList());
+        return new Version(
+                versionParts.get(0), versionParts.get(1), versionParts.get(2), 
null, null, null);
+    }
+
+    private static String getFlinkProperties(String flinkVersion) {
+        // this is needed for oracle-cdc tests.
+        // see https://stackoverflow.com/a/47062742/4915129
+        String javaOptsConfig;
+        Version version = parseVersion(flinkVersion);
+        if (version.compareTo(parseVersion("1.17.0")) >= 0) {
+            // Flink 1.17 renames `env.java.opts` to `env.java.opts.all`
+            javaOptsConfig = "env.java.opts.all: 
-Doracle.jdbc.timezoneAsRegion=false";
+        } else {
+            // Legacy Flink version, might drop their support in near future
+            javaOptsConfig = "env.java.opts: 
-Doracle.jdbc.timezoneAsRegion=false";
+        }
+
+        return String.join(
+                "\n",
+                Arrays.asList(
+                        "jobmanager.rpc.address: jobmanager",
+                        "taskmanager.numberOfTaskSlots: 10",
+                        "parallelism.default: 4",
+                        "execution.checkpointing.interval: 300",
+                        javaOptsConfig));
+    }
 }
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
index adb3e0f82..6f604f346 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
@@ -28,13 +28,13 @@ limitations under the License.
     <artifactId>flink-cdc-source-e2e-tests</artifactId>
 
     <properties>
-        <flink-1.14>1.14.6</flink-1.14>
-        <flink-1.15>1.15.4</flink-1.15>
-        <flink-1.16>1.16.2</flink-1.16>
-        <flink-1.17>1.17.1</flink-1.17>
-        <flink-1.18>1.18.0</flink-1.18>
+        <flink-1.16>1.16.3</flink-1.16>
+        <flink-1.17>1.17.2</flink-1.17>
+        <flink-1.18>1.18.1</flink-1.18>
+        <flink-1.19>1.19.0</flink-1.19>
         <jdbc.version-1.17>3.1.1-1.17</jdbc.version-1.17>
-        <jdbc.version-1.18>3.1.1-1.17</jdbc.version-1.18>
+        <jdbc.version-1.18>3.1.2-1.18</jdbc.version-1.18>
+        <jdbc.version-1.19>3.1.2-1.18</jdbc.version-1.19>
         <mysql.driver.version>8.0.27</mysql.driver.version>
         <postgresql.driver.version>42.5.1</postgresql.driver.version>
     </properties>
@@ -237,21 +237,11 @@ limitations under the License.
                             </outputDirectory>
                         </artifactItem>
 
-                        <artifactItem>
-                            <groupId>org.apache.flink</groupId>
-                            <artifactId>flink-connector-jdbc_2.11</artifactId>
-                            <version>${flink-1.14}</version>
-                            
<destFileName>jdbc-connector_${flink-1.14}.jar</destFileName>
-                            <type>jar</type>
-                            
<outputDirectory>${project.build.directory}/dependencies
-                            </outputDirectory>
-                        </artifactItem>
-
                         <artifactItem>
                             <groupId>org.apache.flink</groupId>
                             <artifactId>flink-connector-jdbc</artifactId>
-                            <version>${flink-1.15}</version>
-                            
<destFileName>jdbc-connector_${flink-1.15}.jar</destFileName>
+                            <version>${flink-1.16}</version>
+                            
<destFileName>jdbc-connector_${flink-1.16}.jar</destFileName>
                             <type>jar</type>
                             
<outputDirectory>${project.build.directory}/dependencies
                             </outputDirectory>
@@ -260,8 +250,8 @@ limitations under the License.
                         <artifactItem>
                             <groupId>org.apache.flink</groupId>
                             <artifactId>flink-connector-jdbc</artifactId>
-                            <version>${flink-1.16}</version>
-                            
<destFileName>jdbc-connector_${flink-1.16}.jar</destFileName>
+                            <version>${jdbc.version-1.17}</version>
+                            
<destFileName>jdbc-connector_${flink-1.17}.jar</destFileName>
                             <type>jar</type>
                             
<outputDirectory>${project.build.directory}/dependencies
                             </outputDirectory>
@@ -270,8 +260,8 @@ limitations under the License.
                         <artifactItem>
                             <groupId>org.apache.flink</groupId>
                             <artifactId>flink-connector-jdbc</artifactId>
-                            <version>${jdbc.version-1.17}</version>
-                            
<destFileName>jdbc-connector_${flink-1.17}.jar</destFileName>
+                            <version>${jdbc.version-1.18}</version>
+                            
<destFileName>jdbc-connector_${flink-1.18}.jar</destFileName>
                             <type>jar</type>
                             
<outputDirectory>${project.build.directory}/dependencies
                             </outputDirectory>
@@ -280,8 +270,8 @@ limitations under the License.
                         <artifactItem>
                             <groupId>org.apache.flink</groupId>
                             <artifactId>flink-connector-jdbc</artifactId>
-                            <version>${jdbc.version-1.18}</version>
-                            
<destFileName>jdbc-connector_${flink-1.18}.jar</destFileName>
+                            <version>${jdbc.version-1.19}</version>
+                            
<destFileName>jdbc-connector_${flink-1.19}.jar</destFileName>
                             <type>jar</type>
                             
<outputDirectory>${project.build.directory}/dependencies
                             </outputDirectory>
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
index 650e205b8..6175aec2c 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.util.TestLogger;
 
+import com.fasterxml.jackson.core.Version;
 import com.github.dockerjava.api.DockerClient;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -62,6 +63,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -80,17 +82,6 @@ public abstract class FlinkContainerTestEnvironment extends 
TestLogger {
     private static final String FLINK_BIN = "bin";
     private static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
     private static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
-    private static final String FLINK_PROPERTIES =
-            String.join(
-                    "\n",
-                    Arrays.asList(
-                            "jobmanager.rpc.address: jobmanager",
-                            "taskmanager.numberOfTaskSlots: 10",
-                            "parallelism.default: 4",
-                            "execution.checkpointing.interval: 10000",
-                            // this is needed for oracle-cdc tests.
-                            // see https://stackoverflow.com/a/47062742/4915129
-                            "env.java.opts: 
-Doracle.jdbc.timezoneAsRegion=false"));
 
     // 
------------------------------------------------------------------------------------------
     // MySQL Variables (we always use MySQL as the sink for easier verifying)
@@ -129,17 +120,19 @@ public abstract class FlinkContainerTestEnvironment 
extends TestLogger {
 
     @Parameterized.Parameters(name = "flinkVersion: {0}")
     public static List<String> getFlinkVersion() {
-        return Arrays.asList("1.14.6", "1.15.4", "1.16.2", "1.17.1", "1.18.0");
+        return Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.0");
     }
 
     private static final List<String> FLINK_VERSION_WITH_SCALA_212 =
-            Arrays.asList("1.15.4", "1.16.2", "1.17.1", "1.18.0");
+            Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.0");
 
     @Before
     public void before() {
         mysqlInventoryDatabase.createAndInitialize();
         jdbcJar = TestUtils.getResource(getJdbcConnectorResourceName());
 
+        String flinkProperties = getFlinkProperties(flinkVersion);
+
         LOG.info("Starting containers...");
         jobManager =
                 new GenericContainer<>(getFlinkDockerImageTag())
@@ -148,7 +141,7 @@ public abstract class FlinkContainerTestEnvironment extends 
TestLogger {
                         .withExtraHost("host.docker.internal", "host-gateway")
                         .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
                         .withExposedPorts(JOB_MANAGER_REST_PORT)
-                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .withEnv("FLINK_PROPERTIES", flinkProperties)
                         .withLogConsumer(new Slf4jLogConsumer(LOG));
         taskManager =
                 new GenericContainer<>(getFlinkDockerImageTag())
@@ -156,7 +149,7 @@ public abstract class FlinkContainerTestEnvironment extends 
TestLogger {
                         .withExtraHost("host.docker.internal", "host-gateway")
                         .withNetwork(NETWORK)
                         .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
-                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .withEnv("FLINK_PROPERTIES", flinkProperties)
                         .dependsOn(jobManager)
                         .withLogConsumer(new Slf4jLogConsumer(LOG));
 
@@ -325,4 +318,37 @@ public abstract class FlinkContainerTestEnvironment 
extends TestLogger {
     protected String getJdbcConnectorResourceName() {
         return String.format("jdbc-connector_%s.jar", flinkVersion);
     }
+
+    private static Version parseVersion(String version) {
+        List<Integer> versionParts =
+                Arrays.stream(version.split("\\."))
+                        .map(Integer::valueOf)
+                        .limit(3)
+                        .collect(Collectors.toList());
+        return new Version(
+                versionParts.get(0), versionParts.get(1), versionParts.get(2), 
null, null, null);
+    }
+
+    private static String getFlinkProperties(String flinkVersion) {
+        // this is needed for oracle-cdc tests.
+        // see https://stackoverflow.com/a/47062742/4915129
+        String javaOptsConfig;
+        Version version = parseVersion(flinkVersion);
+        if (version.compareTo(parseVersion("1.17.0")) >= 0) {
+            // Flink 1.17 renames `env.java.opts` to `env.java.opts.all`
+            javaOptsConfig = "env.java.opts.all: 
-Doracle.jdbc.timezoneAsRegion=false";
+        } else {
+            // Legacy Flink version, might drop their support in near future
+            javaOptsConfig = "env.java.opts: 
-Doracle.jdbc.timezoneAsRegion=false";
+        }
+
+        return String.join(
+                "\n",
+                Arrays.asList(
+                        "jobmanager.rpc.address: jobmanager",
+                        "taskmanager.numberOfTaskSlots: 10",
+                        "parallelism.default: 4",
+                        "execution.checkpointing.interval: 300",
+                        javaOptsConfig));
+    }
 }

Reply via email to