This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 82bf8a08e [tests][ci] Miscellaneous improvements on CI robustness
82bf8a08e is described below

commit 82bf8a08ef4e73b79ac6a69f50baeeae22bf04dc
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Tue Feb 11 20:16:38 2025 +0800

    [tests][ci] Miscellaneous improvements on CI robustness
    
    This closes #3911
---
 .github/workflows/flink_cdc_base.yml               |  3 +-
 .github/workflows/flink_cdc_ci.yml                 |  2 +
 .github/workflows/flink_cdc_ci_nightly.yml         |  2 +
 .../workflows/flink_cdc_migration_test_base.yml    |  3 +
 .../flink-connector-mysql-cdc/pom.xml              |  7 --
 .../connectors/mysql/LegacyMySqlSourceITCase.java  | 56 +++++++------
 .../connectors/polardbx/PolardbxCharsetITCase.java |  4 +-
 .../connectors/polardbx/PolardbxSourceITCase.java  | 12 +--
 .../polardbx/PolardbxSourceTestBase.java           | 49 ++++++------
 ...m-data-schema-exclude-with-numeric-decimal.json | 48 ++++++------
 .../file/debezium-data-schema-exclude.json         | 48 ++++++------
 .../file/debezium-data-schema-include.json         | 48 ++++++------
 tools/mig-test/datastream/compile_jobs.rb          |  2 +-
 tools/mig-test/datastream/datastream-3.2.0/pom.xml | 91 ++++++++++++++++++++--
 .../src/main/java/DataStreamJob.java               |  8 +-
 tools/mig-test/datastream/datastream-3.2.1/pom.xml | 89 +++++++++++++++++++--
 .../src/main/java/DataStreamJob.java               |  8 +-
 tools/mig-test/datastream/datastream-3.3.0/pom.xml | 91 ++++++++++++++++++++--
 .../src/main/java/DataStreamJob.java               |  8 +-
 .../datastream/datastream-3.4-SNAPSHOT/pom.xml     | 91 ++++++++++++++++++++--
 .../src/main/java/DataStreamJob.java               |  8 +-
 tools/mig-test/datastream/run_migration_test.rb    | 58 ++++++--------
 tools/mig-test/run_migration_test.rb               |  8 +-
 23 files changed, 504 insertions(+), 240 deletions(-)

diff --git a/.github/workflows/flink_cdc_base.yml 
b/.github/workflows/flink_cdc_base.yml
index 204878c4e..dcb461ee2 100644
--- a/.github/workflows/flink_cdc_base.yml
+++ b/.github/workflows/flink_cdc_base.yml
@@ -108,6 +108,7 @@ jobs:
     runs-on: ubuntu-latest
     timeout-minutes: 120
     strategy:
+      fail-fast: false
       matrix:
         java-version: ${{ fromJSON(inputs.java-version) }}
         flink-version: ${{ fromJSON(inputs.flink-version) }}
@@ -148,7 +149,7 @@ jobs:
           maven-version: 3.8.6
 
       - name: Compile and test
-        timeout-minutes: 90
+        timeout-minutes: 60
         run: |
           set -o pipefail
 
diff --git a/.github/workflows/flink_cdc_ci.yml 
b/.github/workflows/flink_cdc_ci.yml
index 76e7691e3..47f1c77c2 100644
--- a/.github/workflows/flink_cdc_ci.yml
+++ b/.github/workflows/flink_cdc_ci.yml
@@ -59,6 +59,7 @@ jobs:
         run: gem install rubyzip -v 2.3.0 && ./tools/ci/license_check.rb
   ut:
     strategy:
+      fail-fast: false
       matrix:
         module: [ 'core', 'pipeline_connectors', 'mysql', 'postgres', 
'oracle', 'mongodb6', 'mongodb7', 'sqlserver', 'tidb', 'oceanbase', 'db2', 
'vitess' ]
     name: Unit Tests
@@ -68,6 +69,7 @@ jobs:
       module: ${{ matrix.module }}
   pipeline_e2e:
     strategy:
+      fail-fast: false
       matrix:
         parallelism: [ 1, 4 ]
     name: Pipeline E2E Tests (${{ matrix.parallelism }}-Parallelism)
diff --git a/.github/workflows/flink_cdc_ci_nightly.yml 
b/.github/workflows/flink_cdc_ci_nightly.yml
index 456c5f892..c69238f42 100644
--- a/.github/workflows/flink_cdc_ci_nightly.yml
+++ b/.github/workflows/flink_cdc_ci_nightly.yml
@@ -48,6 +48,7 @@ jobs:
         run: gem install rubyzip -v 2.3.0 && ./tools/ci/license_check.rb
   ut:
     strategy:
+      fail-fast: false
       matrix:
         module: [ 'core', 'pipeline_connectors', 'mysql', 'postgres', 
'oracle', 'mongodb6', 'mongodb7', 'sqlserver', 'tidb', 'oceanbase', 'db2', 
'vitess' ]
     name: Unit Tests
@@ -57,6 +58,7 @@ jobs:
       module: ${{ matrix.module }}
   pipeline_e2e:
     strategy:
+      fail-fast: false
       matrix:
         parallelism: [ 1, 4 ]
     name: Pipeline E2E Tests (${{ matrix.parallelism }} Parallelism)
diff --git a/.github/workflows/flink_cdc_migration_test_base.yml 
b/.github/workflows/flink_cdc_migration_test_base.yml
index fa8012fd3..ff707c0fd 100644
--- a/.github/workflows/flink_cdc_migration_test_base.yml
+++ b/.github/workflows/flink_cdc_migration_test_base.yml
@@ -32,6 +32,7 @@ jobs:
   migration_test_ut:
     runs-on: ubuntu-latest
     strategy:
+      fail-fast: false
       matrix:
         java-version: ${{ fromJSON(inputs.java-version) }}
         flink-version: ${{ fromJSON(inputs.flink-version) }}
@@ -52,6 +53,7 @@ jobs:
   pipeline_migration_test:
     runs-on: ubuntu-latest
     strategy:
+      fail-fast: false
       matrix:
         java-version: ${{ fromJSON(inputs.java-version) }}
         flink-version: ${{ fromJSON(inputs.flink-version) }}
@@ -91,6 +93,7 @@ jobs:
   data_stream_migration_test:
     runs-on: ubuntu-latest
     strategy:
+      fail-fast: false
       matrix:
         java-version: ${{ fromJSON(inputs.java-version) }}
         flink-version: [ '1.19.1', '1.20.0' ]
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
index 8e6886e15..96366a9af 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
@@ -98,13 +98,6 @@ limitations under the License.
             <scope>test</scope>
         </dependency>
 
-        <dependency>
-            <groupId>com.alibaba</groupId>
-            <artifactId>fastjson</artifactId>
-            <version>1.2.83</version>
-            <scope>test</scope>
-        </dependency>
-
         <!-- test dependencies on Flink -->
 
         <dependency>
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/LegacyMySqlSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/LegacyMySqlSourceITCase.java
index de7acd86a..b562c7829 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/LegacyMySqlSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/LegacyMySqlSourceITCase.java
@@ -28,8 +28,11 @@ import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
-import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -46,8 +49,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-import static org.junit.Assert.assertTrue;
-
 /** Integration tests for the legacy {@link MySqlSource}. */
 public class LegacyMySqlSourceITCase extends LegacyMySqlTestBase {
 
@@ -99,9 +100,9 @@ public class LegacyMySqlSourceITCase extends 
LegacyMySqlTestBase {
                 StreamTableEnvironment.create(
                         env, 
EnvironmentSettings.newInstance().inStreamingMode().build());
 
-        final JSONObject expected =
-                JSONObject.parseObject(readLines(expectedFile), 
JSONObject.class);
-        JSONObject expectSnapshot = 
expected.getJSONObject("expected_snapshot");
+        final JsonNode expected =
+                new ObjectMapper().readValue(readLines(expectedFile), 
JsonNode.class);
+        JsonNode expectSnapshot = expected.get("expected_snapshot");
 
         DataStreamSource<String> source = env.addSource(sourceFunction);
         tEnv.createTemporaryView("full_types", source);
@@ -110,9 +111,8 @@ public class LegacyMySqlSourceITCase extends 
LegacyMySqlTestBase {
         // check the snapshot result
         CloseableIterator<Row> snapshot = result.collect();
         waitForSnapshotStarted(snapshot);
-        assertTrue(
-                dataInJsonIsEquals(
-                        fetchRows(snapshot, 1).get(0).toString(), 
expectSnapshot.toString()));
+
+        assertJsonEquals(extractJsonBody(snapshot.next()), expectSnapshot);
         try (Connection connection = fullTypesDatabase.getJdbcConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute(
@@ -121,10 +121,8 @@ public class LegacyMySqlSourceITCase extends 
LegacyMySqlTestBase {
 
         // check the binlog result
         CloseableIterator<Row> binlog = result.collect();
-        JSONObject expectBinlog = expected.getJSONObject("expected_binlog");
-        assertTrue(
-                dataInJsonIsEquals(
-                        fetchRows(binlog, 1).get(0).toString(), 
expectBinlog.toString()));
+        JsonNode expectBinlog = expected.get("expected_binlog");
+        assertJsonEquals(extractJsonBody(binlog.next()), expectBinlog);
         result.getJobClient().get().cancel().get();
     }
 
@@ -164,25 +162,23 @@ public class LegacyMySqlSourceITCase extends 
LegacyMySqlTestBase {
         return Files.readAllBytes(path);
     }
 
-    private static boolean dataInJsonIsEquals(String actual, String expect) {
-        JSONObject actualJsonObject = JSONObject.parseObject(actual);
-        JSONObject expectJsonObject = JSONObject.parseObject(expect);
-
-        if (expectJsonObject.getJSONObject("payload") != null
-                && actualJsonObject.getJSONObject("payload") != null) {
-            expectJsonObject = expectJsonObject.getJSONObject("payload");
-            actualJsonObject = actualJsonObject.getJSONObject("payload");
+    private static void assertJsonEquals(JsonNode actual, JsonNode expect) 
throws Exception {
+        if (actual.get("payload") != null && expect.get("payload") != null) {
+            actual = actual.get("payload");
+            expect = expect.get("payload");
         }
-        return jsonObjectEquals(
-                        expectJsonObject.getJSONObject("after"),
-                        actualJsonObject.getJSONObject("after"))
-                && jsonObjectEquals(
-                        expectJsonObject.getJSONObject("before"),
-                        actualJsonObject.getJSONObject("before"))
-                && Objects.equals(expectJsonObject.get("op"), 
actualJsonObject.get("op"));
+        
Assertions.assertThat(actual.get("after")).isEqualTo(expect.get("after"));
+        
Assertions.assertThat(actual.get("before")).isEqualTo(expect.get("before"));
+        Assertions.assertThat(actual.get("op")).isEqualTo(expect.get("op"));
     }
 
-    private static boolean jsonObjectEquals(JSONObject a, JSONObject b) {
-        return (a == b) || (a != null && a.toString().equals(b.toString()));
+    private static JsonNode extractJsonBody(Row row) {
+        try {
+            String body = row.toString();
+            return new ObjectMapper()
+                    .readValue(body.substring(3, body.length() - 1), 
JsonNode.class);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException("Invalid JSON format.", e);
+        }
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxCharsetITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxCharsetITCase.java
index d4ecb5d97..d8507c477 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxCharsetITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxCharsetITCase.java
@@ -167,8 +167,8 @@ public class PolardbxCharsetITCase extends 
PolardbxSourceTestBase {
                                 + " 'scan.incremental.snapshot.chunk.size' = 
'%s'"
                                 + ")",
                         testName,
-                        HOST_NAME,
-                        PORT,
+                        getHost(),
+                        getPort(),
                         USER_NAME,
                         PASSWORD,
                         DATABASE,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceITCase.java
index db87a98ac..901a424fb 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceITCase.java
@@ -83,8 +83,8 @@ public class PolardbxSourceITCase extends 
PolardbxSourceTestBase {
                                 + " 'server-time-zone' = 'UTC',"
                                 + " 'server-id' = '%s'"
                                 + ")",
-                        HOST_NAME,
-                        PORT,
+                        getHost(),
+                        getPort(),
                         USER_NAME,
                         PASSWORD,
                         DATABASE,
@@ -234,8 +234,8 @@ public class PolardbxSourceITCase extends 
PolardbxSourceTestBase {
                                 + " 'server-time-zone' = 'UTC',"
                                 + " 'server-id' = '%s'"
                                 + ")",
-                        HOST_NAME,
-                        PORT,
+                        getHost(),
+                        getPort(),
                         USER_NAME,
                         PASSWORD,
                         DATABASE,
@@ -301,8 +301,8 @@ public class PolardbxSourceITCase extends 
PolardbxSourceTestBase {
                                 + " 'server-time-zone' = 'UTC',"
                                 + " 'server-id' = '%s'"
                                 + ")",
-                        HOST_NAME,
-                        PORT,
+                        getHost(),
+                        getPort(),
                         USER_NAME,
                         PASSWORD,
                         DATABASE,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceTestBase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceTestBase.java
index 43c3c6783..65149c883 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceTestBase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceTestBase.java
@@ -17,13 +17,11 @@
 
 package org.apache.flink.cdc.connectors.polardbx;
 
+import org.apache.flink.cdc.common.utils.TestCaseUtils;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
-import com.github.dockerjava.api.model.ExposedPort;
-import com.github.dockerjava.api.model.PortBinding;
-import com.github.dockerjava.api.model.Ports;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -64,36 +62,37 @@ import static org.junit.Assert.assertTrue;
 public abstract class PolardbxSourceTestBase extends AbstractTestBase {
     private static final Logger LOG = 
LoggerFactory.getLogger(PolardbxSourceTestBase.class);
     private static final Pattern COMMENT_PATTERN = 
Pattern.compile("^(.*)--.*$");
-    protected static final Integer PORT = 8527;
-    protected static final String HOST_NAME = "127.0.0.1";
-    protected static final String USER_NAME = "polardbx_root";
-    protected static final String PASSWORD = "123456";
+
     private static final String IMAGE_VERSION = "2.1.0";
     private static final DockerImageName POLARDBX_IMAGE =
             DockerImageName.parse("polardbx/polardb-x:" + IMAGE_VERSION);
 
+    protected static final Integer INNER_PORT = 8527;
+    protected static final String USER_NAME = "polardbx_root";
+    protected static final String PASSWORD = "123456";
+    protected static final Duration WAITING_TIMEOUT = Duration.ofMinutes(1);
+
     protected static final GenericContainer POLARDBX_CONTAINER =
             new GenericContainer<>(POLARDBX_IMAGE)
-                    .withExposedPorts(PORT)
+                    .withExposedPorts(INNER_PORT)
                     .withLogConsumer(new Slf4jLogConsumer(LOG))
-                    .withStartupTimeout(Duration.ofMinutes(3))
-                    .withCreateContainerCmdModifier(
-                            c ->
-                                    c.withPortBindings(
-                                            new PortBinding(
-                                                    
Ports.Binding.bindPort(PORT),
-                                                    new ExposedPort(PORT))));
+                    .withStartupTimeout(Duration.ofMinutes(3));
+
+    protected static String getHost() {
+        return POLARDBX_CONTAINER.getHost();
+    }
+
+    protected static int getPort() {
+        return POLARDBX_CONTAINER.getMappedPort(INNER_PORT);
+    }
 
     @BeforeClass
-    public static void startContainers() throws InterruptedException {
-        // no need to start container when the port 8527 is listening
-        if (!checkConnection()) {
-            LOG.info("Polardbx connection is not valid, so try to start 
containers...");
-            Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
-            LOG.info("Containers are started.");
-            // here should wait 10s that make sure the polardbx is ready
-            Thread.sleep(10 * 1000);
-        }
+    public static void startContainers() {
+        Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
+        LOG.info("Containers are started.");
+
+        TestCaseUtils.repeatedCheck(
+                PolardbxSourceTestBase::checkConnection, WAITING_TIMEOUT, 
Duration.ofSeconds(1));
     }
 
     @AfterClass
@@ -104,7 +103,7 @@ public abstract class PolardbxSourceTestBase extends 
AbstractTestBase {
     }
 
     protected static String getJdbcUrl() {
-        return String.format("jdbc:mysql://%s:%s", HOST_NAME, PORT);
+        return String.format("jdbc:mysql://%s:%s", getHost(), getPort());
     }
 
     protected static Connection getJdbcConnection() throws SQLException {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-exclude-with-numeric-decimal.json
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-exclude-with-numeric-decimal.json
index f11a4cb39..20c641df5 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-exclude-with-numeric-decimal.json
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-exclude-with-numeric-decimal.json
@@ -52,14 +52,14 @@
             "enum_c": "red",
             "set_c": "a,b",
             "json_c": "{\"key1\": \"value1\"}",
-            "point_c": {"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/"},
-            
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-            "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA"},
-            "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-            "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA"},
-            "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA="},
-            "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA"},
-            "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA="}
+            "point_c": 
{"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA","srid":null},
+            "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA","srid":null},
+            "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA=","srid":null},
+            "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA","srid":null},
+            "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA=","srid":null}
         },
         "op": "r",
         "transaction": null
@@ -116,14 +116,14 @@
             "enum_c": "red",
             "set_c": "a,b",
             "json_c": "{\"key1\":\"value1\"}",
-            "point_c": {"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/"},
-            
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-            "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA"},
-            "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-            "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA"},
-            "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA="},
-            "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA"},
-            "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA="}
+            "point_c": 
{"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA","srid":null},
+            "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA","srid":null},
+            "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA=","srid":null},
+            "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA","srid":null},
+            "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA=","srid":null}
         },
         "after": {
             "id": 1,
@@ -176,14 +176,14 @@
             "enum_c": "red",
             "set_c": "a,b",
             "json_c": "{\"key1\":\"value1\"}",
-            "point_c": {"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/"},
-            
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-            "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA"},
-            "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-            "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA"},
-            "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA="},
-            "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA"},
-            "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA="}
+            "point_c": 
{"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA","srid":null},
+            "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA","srid":null},
+            "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA=","srid":null},
+            "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA","srid":null},
+            "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA=","srid":null}
         },
         "op": "u",
         "transaction": null
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-exclude.json
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-exclude.json
index 3e5dedd08..41cc94b37 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-exclude.json
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-exclude.json
@@ -52,14 +52,14 @@
             "enum_c": "red",
             "set_c": "a,b",
             "json_c": "{\"key1\": \"value1\"}",
-            "point_c": {"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/"},
-            
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-            "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA"},
-            "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-            "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA"},
-            "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA="},
-            "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA"},
-            "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA="}
+            "point_c": 
{"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA","srid":null},
+            "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA","srid":null},
+            "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA=","srid":null},
+            "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA","srid":null},
+            "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA=","srid":null}
         },
         "op": "r",
         "transaction": null
@@ -116,14 +116,14 @@
             "enum_c": "red",
             "set_c": "a,b",
             "json_c": "{\"key1\":\"value1\"}",
-            "point_c": {"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/"},
-            
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-            "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA"},
-            "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-            "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA"},
-            "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA="},
-            "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA"},
-            "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA="}
+            "point_c": 
{"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA","srid":null},
+            "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA","srid":null},
+            "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA=","srid":null},
+            "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA","srid":null},
+            "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA=","srid":null}
         },
         "after": {
             "id": "AQ==",
@@ -176,14 +176,14 @@
             "enum_c": "red",
             "set_c": "a,b",
             "json_c": "{\"key1\":\"value1\"}",
-            "point_c": {"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/"},
-            
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-            "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA"},
-            "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-            "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA"},
-            "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA="},
-            "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA"},
-            "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA="}
+            "point_c": 
{"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA","srid":null},
+            "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+            "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA","srid":null},
+            "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA=","srid":null},
+            "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA","srid":null},
+            "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA=","srid":null}
         },
         "op": "u",
         "transaction": null
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-include.json
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-include.json
index fed8c09a9..74dc65456 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-include.json
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-include.json
@@ -725,14 +725,14 @@
                 "enum_c": "red",
                 "set_c": "a,b",
                 "json_c": "{\"key1\": \"value1\"}",
-                "point_c": 
{"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/"},
-                
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-                "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA"},
-                "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-                "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA"},
-                "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA="},
-                "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA"},
-                "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA="}
+                "point_c": 
{"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/","srid":null},
+                
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+                "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA","srid":null},
+                "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+                "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA","srid":null},
+                "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA=","srid":null},
+                "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA","srid":null},
+                "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA=","srid":null}
             },
             "source": {
                 "version": "1.5.2.Final",
@@ -1080,14 +1080,14 @@
                 "enum_c": "red",
                 "set_c": "a,b",
                 "json_c": "{\"key1\":\"value1\"}",
-                "point_c": 
{"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/"},
-                
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-                "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA"},
-                "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-                "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA"},
-                "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA="},
-                "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA"},
-                "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA="}
+                "point_c": 
{"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/","srid":null},
+                
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+                "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA","srid":null},
+                "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+                "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA","srid":null},
+                "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA=","srid":null},
+                "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA","srid":null},
+                "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA=","srid":null}
             },
             "after": {
                 "id": "AQ==",
@@ -1140,14 +1140,14 @@
                 "enum_c": "red",
                 "set_c": "a,b",
                 "json_c": "{\"key1\":\"value1\"}",
-                "point_c": 
{"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/"},
-                
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-                "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA"},
-                "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/"},
-                "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA"},
-                "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA="},
-                "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA"},
-                "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA="}
+                "point_c": 
{"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/","srid":null},
+                
"geometry_c":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+                "linestring_c": 
{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA","srid":null},
+                "polygon_c": 
{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},
+                "multipoint_c": 
{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA","srid":null},
+                "multiline_c": 
{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA=","srid":null},
+                "multipolygon_c": 
{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA","srid":null},
+                "geometrycollection_c": 
{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA=","srid":null}
             },
             "source": {
                 "version": "1.5.2.Final",
diff --git a/tools/mig-test/datastream/compile_jobs.rb 
b/tools/mig-test/datastream/compile_jobs.rb
index 5c906e5bf..2326a4540 100644
--- a/tools/mig-test/datastream/compile_jobs.rb
+++ b/tools/mig-test/datastream/compile_jobs.rb
@@ -20,7 +20,7 @@ JOB_VERSIONS = %w[3.2.0 3.2.1 3.3.0 3.4-SNAPSHOT]
 
 JOB_VERSIONS.each do |version|
   puts "Compiling DataStream job for CDC #{version}"
-  `cd datastream-#{version} && mvn clean package -DskipTests`
+  system "cd datastream-#{version} && mvn clean package -DskipTests"
 end
 
 puts 'Done'
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-3.2.0/pom.xml 
b/tools/mig-test/datastream/datastream-3.2.0/pom.xml
index c1f556033..b624439ed 100644
--- a/tools/mig-test/datastream/datastream-3.2.0/pom.xml
+++ b/tools/mig-test/datastream/datastream-3.2.0/pom.xml
@@ -29,7 +29,7 @@ limitations under the License.
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <flink.version>1.18.1</flink.version>
         <flink.cdc.version>3.2.0</flink.cdc.version>
-        <debezium.version>1.9.7.Final</debezium.version>
+        <debezium.version>1.9.8.Final</debezium.version>
         <scala.binary.version>2.12</scala.binary.version>
         <slf4j.version>2.0.13</slf4j.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -136,20 +136,95 @@ limitations under the License.
                 </configuration>
             </plugin>
             <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
                 <executions>
                     <execution>
+                        <id>shade-flink</id>
                         <phase>package</phase>
                         <goals>
-                            <goal>single</goal>
+                            <goal>shade</goal>
                         </goals>
+                        <configuration>
+                            <!-- Shading test jar have bug in some previous 
version, so close this configuration here,
+                            see 
https://issues.apache.org/jira/browse/MSHADE-284 -->
+                            <shadeTestJar>false</shadeTestJar>
+                            
<shadedArtifactAttached>false</shadedArtifactAttached>
+                            
<createDependencyReducedPom>true</createDependencyReducedPom>
+                            <dependencyReducedPomLocation>
+                                
${project.basedir}/target/dependency-reduced-pom.xml
+                            </dependencyReducedPomLocation>
+                            <filters combine.children="append">
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>module-info.class</exclude>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <artifactSet>
+                                <includes>
+                                    <include>io.debezium:debezium-api</include>
+                                    
<include>io.debezium:debezium-embedded</include>
+                                    
<include>io.debezium:debezium-core</include>
+                                    
<include>io.debezium:debezium-ddl-parser</include>
+                                    
<include>io.debezium:debezium-connector-mysql</include>
+                                    
<include>org.apache.flink:flink-connector-debezium</include>
+                                    
<include>org.apache.flink:flink-connector-mysql-cdc</include>
+                                    <include>org.antlr:antlr4-runtime</include>
+                                    <include>org.apache.kafka:*</include>
+                                    
<include>mysql:mysql-connector-java</include>
+                                    
<include>com.zendesk:mysql-binlog-connector-java</include>
+                                    <include>com.fasterxml.*:*</include>
+                                    <include>com.google.guava:*</include>
+                                    
<include>com.esri.geometry:esri-geometry-api</include>
+                                    <include>com.zaxxer:HikariCP</include>
+                                    <!--  Include fixed version 
30.1.1-jre-16.0 of flink shaded guava  -->
+                                    
<include>org.apache.flink:flink-shaded-guava</include>
+                                </includes>
+                            </artifactSet>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.kafka</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.org.apache.kafka
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.antlr</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.org.antlr
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.com.fasterxml
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.com.google
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.esri.geometry</pattern>
+                                    
<shadedPattern>org.apache.flink.cdc.connectors.shaded.com.esri.geometry</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.zaxxer</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.com.zaxxer
+                                    </shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
                     </execution>
                 </executions>
-                <configuration>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                </configuration>
             </plugin>
         </plugins>
     </build>
diff --git 
a/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java 
b/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java
index f821ac0a2..bfaa2d529 100644
--- 
a/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java
+++ 
b/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public class DataStreamJob {
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws Exception {
         MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                 .hostname("localhost")
                 .port(3306)
@@ -45,10 +45,6 @@ public class DataStreamJob {
                 .print()
                 .setParallelism(1);
 
-        try {
-            env.execute();
-        } catch (Exception e) {
-            // ... unfortunately
-        }
+        env.execute();
     }
 }
diff --git a/tools/mig-test/datastream/datastream-3.2.1/pom.xml 
b/tools/mig-test/datastream/datastream-3.2.1/pom.xml
index c7d680a2f..08c7337af 100644
--- a/tools/mig-test/datastream/datastream-3.2.1/pom.xml
+++ b/tools/mig-test/datastream/datastream-3.2.1/pom.xml
@@ -136,20 +136,95 @@ limitations under the License.
                 </configuration>
             </plugin>
             <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
                 <executions>
                     <execution>
+                        <id>shade-flink</id>
                         <phase>package</phase>
                         <goals>
-                            <goal>single</goal>
+                            <goal>shade</goal>
                         </goals>
+                        <configuration>
+                            <!-- Shading test jar have bug in some previous 
version, so close this configuration here,
+                            see 
https://issues.apache.org/jira/browse/MSHADE-284 -->
+                            <shadeTestJar>false</shadeTestJar>
+                            
<shadedArtifactAttached>false</shadedArtifactAttached>
+                            
<createDependencyReducedPom>true</createDependencyReducedPom>
+                            <dependencyReducedPomLocation>
+                                
${project.basedir}/target/dependency-reduced-pom.xml
+                            </dependencyReducedPomLocation>
+                            <filters combine.children="append">
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>module-info.class</exclude>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <artifactSet>
+                                <includes>
+                                    <include>io.debezium:debezium-api</include>
+                                    
<include>io.debezium:debezium-embedded</include>
+                                    
<include>io.debezium:debezium-core</include>
+                                    
<include>io.debezium:debezium-ddl-parser</include>
+                                    
<include>io.debezium:debezium-connector-mysql</include>
+                                    
<include>org.apache.flink:flink-connector-debezium</include>
+                                    
<include>org.apache.flink:flink-connector-mysql-cdc</include>
+                                    <include>org.antlr:antlr4-runtime</include>
+                                    <include>org.apache.kafka:*</include>
+                                    
<include>mysql:mysql-connector-java</include>
+                                    
<include>com.zendesk:mysql-binlog-connector-java</include>
+                                    <include>com.fasterxml.*:*</include>
+                                    <include>com.google.guava:*</include>
+                                    
<include>com.esri.geometry:esri-geometry-api</include>
+                                    <include>com.zaxxer:HikariCP</include>
+                                    <!--  Include fixed version 
30.1.1-jre-16.0 of flink shaded guava  -->
+                                    
<include>org.apache.flink:flink-shaded-guava</include>
+                                </includes>
+                            </artifactSet>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.kafka</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.org.apache.kafka
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.antlr</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.org.antlr
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.com.fasterxml
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.com.google
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.esri.geometry</pattern>
+                                    
<shadedPattern>org.apache.flink.cdc.connectors.shaded.com.esri.geometry</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.zaxxer</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.com.zaxxer
+                                    </shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
                     </execution>
                 </executions>
-                <configuration>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                </configuration>
             </plugin>
         </plugins>
     </build>
diff --git 
a/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java 
b/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java
index f821ac0a2..bfaa2d529 100644
--- 
a/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java
+++ 
b/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public class DataStreamJob {
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws Exception {
         MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                 .hostname("localhost")
                 .port(3306)
@@ -45,10 +45,6 @@ public class DataStreamJob {
                 .print()
                 .setParallelism(1);
 
-        try {
-            env.execute();
-        } catch (Exception e) {
-            // ... unfortunately
-        }
+        env.execute();
     }
 }
diff --git a/tools/mig-test/datastream/datastream-3.3.0/pom.xml 
b/tools/mig-test/datastream/datastream-3.3.0/pom.xml
index cc65c6c78..63bcd21b9 100644
--- a/tools/mig-test/datastream/datastream-3.3.0/pom.xml
+++ b/tools/mig-test/datastream/datastream-3.3.0/pom.xml
@@ -29,7 +29,7 @@ limitations under the License.
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <flink.version>1.19.1</flink.version>
         <flink.cdc.version>3.3.0</flink.cdc.version>
-        <debezium.version>1.9.7.Final</debezium.version>
+        <debezium.version>1.9.8.Final</debezium.version>
         <scala.binary.version>2.12</scala.binary.version>
         <slf4j.version>2.0.13</slf4j.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -136,20 +136,95 @@ limitations under the License.
                 </configuration>
             </plugin>
             <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
                 <executions>
                     <execution>
+                        <id>shade-flink</id>
                         <phase>package</phase>
                         <goals>
-                            <goal>single</goal>
+                            <goal>shade</goal>
                         </goals>
+                        <configuration>
+                            <!-- Shading test jar have bug in some previous 
version, so close this configuration here,
+                            see 
https://issues.apache.org/jira/browse/MSHADE-284 -->
+                            <shadeTestJar>false</shadeTestJar>
+                            
<shadedArtifactAttached>false</shadedArtifactAttached>
+                            
<createDependencyReducedPom>true</createDependencyReducedPom>
+                            <dependencyReducedPomLocation>
+                                
${project.basedir}/target/dependency-reduced-pom.xml
+                            </dependencyReducedPomLocation>
+                            <filters combine.children="append">
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>module-info.class</exclude>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <artifactSet>
+                                <includes>
+                                    <include>io.debezium:debezium-api</include>
+                                    
<include>io.debezium:debezium-embedded</include>
+                                    
<include>io.debezium:debezium-core</include>
+                                    
<include>io.debezium:debezium-ddl-parser</include>
+                                    
<include>io.debezium:debezium-connector-mysql</include>
+                                    
<include>org.apache.flink:flink-connector-debezium</include>
+                                    
<include>org.apache.flink:flink-connector-mysql-cdc</include>
+                                    <include>org.antlr:antlr4-runtime</include>
+                                    <include>org.apache.kafka:*</include>
+                                    
<include>mysql:mysql-connector-java</include>
+                                    
<include>com.zendesk:mysql-binlog-connector-java</include>
+                                    <include>com.fasterxml.*:*</include>
+                                    <include>com.google.guava:*</include>
+                                    
<include>com.esri.geometry:esri-geometry-api</include>
+                                    <include>com.zaxxer:HikariCP</include>
+                                    <!--  Include fixed version 
30.1.1-jre-16.0 of flink shaded guava  -->
+                                    
<include>org.apache.flink:flink-shaded-guava</include>
+                                </includes>
+                            </artifactSet>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.kafka</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.org.apache.kafka
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.antlr</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.org.antlr
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.com.fasterxml
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.com.google
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.esri.geometry</pattern>
+                                    
<shadedPattern>org.apache.flink.cdc.connectors.shaded.com.esri.geometry</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.zaxxer</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.com.zaxxer
+                                    </shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
                     </execution>
                 </executions>
-                <configuration>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                </configuration>
             </plugin>
         </plugins>
     </build>
diff --git 
a/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java 
b/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java
index f821ac0a2..bfaa2d529 100644
--- 
a/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java
+++ 
b/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public class DataStreamJob {
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws Exception {
         MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                 .hostname("localhost")
                 .port(3306)
@@ -45,10 +45,6 @@ public class DataStreamJob {
                 .print()
                 .setParallelism(1);
 
-        try {
-            env.execute();
-        } catch (Exception e) {
-            // ... unfortunately
-        }
+        env.execute();
     }
 }
diff --git a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml 
b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml
index e174d5583..6d73f0e04 100644
--- a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml
+++ b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml
@@ -29,7 +29,7 @@ limitations under the License.
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <flink.version>1.19.1</flink.version>
         <flink.cdc.version>3.4-SNAPSHOT</flink.cdc.version>
-        <debezium.version>1.9.7.Final</debezium.version>
+        <debezium.version>1.9.8.Final</debezium.version>
         <scala.binary.version>2.12</scala.binary.version>
         <slf4j.version>2.0.13</slf4j.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -136,20 +136,95 @@ limitations under the License.
                 </configuration>
             </plugin>
             <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
                 <executions>
                     <execution>
+                        <id>shade-flink</id>
                         <phase>package</phase>
                         <goals>
-                            <goal>single</goal>
+                            <goal>shade</goal>
                         </goals>
+                        <configuration>
+                            <!-- Shading test jar have bug in some previous 
version, so close this configuration here,
+                            see 
https://issues.apache.org/jira/browse/MSHADE-284 -->
+                            <shadeTestJar>false</shadeTestJar>
+                            
<shadedArtifactAttached>false</shadedArtifactAttached>
+                            
<createDependencyReducedPom>true</createDependencyReducedPom>
+                            <dependencyReducedPomLocation>
+                                
${project.basedir}/target/dependency-reduced-pom.xml
+                            </dependencyReducedPomLocation>
+                            <filters combine.children="append">
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>module-info.class</exclude>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <artifactSet>
+                                <includes>
+                                    <include>io.debezium:debezium-api</include>
+                                    
<include>io.debezium:debezium-embedded</include>
+                                    
<include>io.debezium:debezium-core</include>
+                                    
<include>io.debezium:debezium-ddl-parser</include>
+                                    
<include>io.debezium:debezium-connector-mysql</include>
+                                    
<include>org.apache.flink:flink-connector-debezium</include>
+                                    
<include>org.apache.flink:flink-connector-mysql-cdc</include>
+                                    <include>org.antlr:antlr4-runtime</include>
+                                    <include>org.apache.kafka:*</include>
+                                    
<include>mysql:mysql-connector-java</include>
+                                    
<include>com.zendesk:mysql-binlog-connector-java</include>
+                                    <include>com.fasterxml.*:*</include>
+                                    <include>com.google.guava:*</include>
+                                    
<include>com.esri.geometry:esri-geometry-api</include>
+                                    <include>com.zaxxer:HikariCP</include>
+                                    <!--  Include fixed version 
30.1.1-jre-16.0 of flink shaded guava  -->
+                                    
<include>org.apache.flink:flink-shaded-guava</include>
+                                </includes>
+                            </artifactSet>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.kafka</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.org.apache.kafka
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.antlr</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.org.antlr
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.com.fasterxml
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.com.google
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.esri.geometry</pattern>
+                                    
<shadedPattern>org.apache.flink.cdc.connectors.shaded.com.esri.geometry</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.zaxxer</pattern>
+                                    <shadedPattern>
+                                        
org.apache.flink.cdc.connectors.shaded.com.zaxxer
+                                    </shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
                     </execution>
                 </executions>
-                <configuration>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                </configuration>
             </plugin>
         </plugins>
     </build>
diff --git 
a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java
 
b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java
index f821ac0a2..bfaa2d529 100644
--- 
a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java
+++ 
b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public class DataStreamJob {
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws Exception {
         MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                 .hostname("localhost")
                 .port(3306)
@@ -45,10 +45,6 @@ public class DataStreamJob {
                 .print()
                 .setParallelism(1);
 
-        try {
-            env.execute();
-        } catch (Exception e) {
-            // ... unfortunately
-        }
+        env.execute();
     }
 }
diff --git a/tools/mig-test/datastream/run_migration_test.rb 
b/tools/mig-test/datastream/run_migration_test.rb
index deb16b0f0..951d043da 100644
--- a/tools/mig-test/datastream/run_migration_test.rb
+++ b/tools/mig-test/datastream/run_migration_test.rb
@@ -32,12 +32,18 @@ def exec_sql_source(sql)
   `mysql -h 127.0.0.1 -P#{SOURCE_PORT} -uroot --skip-password -e "USE 
#{DATABASE_NAME}; #{sql}"`
 end
 
+def extract_job_id(output)
+  current_job_id = output.split("\n").filter { _1.start_with?('Job has been 
submitted with JobID ') }.first&.split&.last
+  raise StandardError, "Failed to submit Flink job. Output: #{output}" unless 
current_job_id&.length == 32
+  current_job_id
+end
+
 def put_mystery_data(mystery)
   exec_sql_source("REPLACE INTO girl(id, name) VALUES (17, '#{mystery}');")
 end
 
 def ensure_mystery_data(mystery)
-  throw StandardError, 'Failed to get specific mystery string' unless `cat 
#{FLINK_HOME}/log/*.out`.include? mystery
+  raise StandardError, 'Failed to get specific mystery string' unless `cat 
#{FLINK_HOME}/log/*.out`.include? mystery
 end
 
 puts '   Waiting for source to start up...'
@@ -52,8 +58,8 @@ def test_migration_chore(from_version, to_version)
   # Clear previous savepoints and logs
   `rm -rf savepoints`
 
-  old_job_id = `#{FLINK_HOME}/bin/flink run -p 1 -c DataStreamJob --detached 
datastream-#{from_version}/target/datastream-job-#{from_version}-jar-with-dependencies.jar`.split.last
-  raise StandardError, 'Failed to submit Flink job' unless old_job_id.length 
== 32
+  old_output = `#{FLINK_HOME}/bin/flink run -p 1 -c DataStreamJob --detached 
datastream-#{from_version}/target/datastream-job-#{from_version}.jar`
+  old_job_id = extract_job_id(old_output)
 
   puts "Submitted job at #{from_version} as #{old_job_id}"
 
@@ -64,8 +70,8 @@ def test_migration_chore(from_version, to_version)
 
   puts `#{FLINK_HOME}/bin/flink stop --savepointPath #{Dir.pwd}/savepoints 
#{old_job_id}`
   savepoint_file = `ls savepoints`.split("\n").last
-  new_job_id = `#{FLINK_HOME}/bin/flink run --fromSavepoint 
#{Dir.pwd}/savepoints/#{savepoint_file} -p 1 -c DataStreamJob --detached 
datastream-#{to_version}/target/datastream-job-#{to_version}-jar-with-dependencies.jar`.split.last
-  raise StandardError, 'Failed to submit Flink job' unless new_job_id.length 
== 32
+  new_output = `#{FLINK_HOME}/bin/flink run --fromSavepoint 
#{Dir.pwd}/savepoints/#{savepoint_file} -p 1 -c DataStreamJob --detached 
datastream-#{to_version}/target/datastream-job-#{to_version}.jar`
+  new_job_id = extract_job_id(new_output)
 
   puts "Submitted job at #{to_version} as #{new_job_id}"
   random_string_2 = SecureRandom.hex(8)
@@ -94,45 +100,25 @@ def test_migration(from_version, to_version)
 end
 
 version_list = %w[3.2.0 3.2.1 3.3.0 3.4-SNAPSHOT]
-version_result = Hash.new('❓')
 @failures = []
 
-version_list.each_with_index do |old_version, old_index|
+new_version = version_list.last
+
+version_list.each do |old_version|
+  puts "-> Testing migrating from #{old_version} to latest snapshot."
   puts 'Restarting cluster...'
   `#{FLINK_HOME}/bin/stop-cluster.sh`
   `rm -rf #{FLINK_HOME}/log/flink-*.out`
+  puts 'Stopped cluster.'
   `#{FLINK_HOME}/bin/start-cluster.sh`
-  version_list.each_with_index do |new_version, new_index|
-    next if old_index > new_index
-
-    result = test_migration old_version, new_version
-    version_result[old_version + new_version] = result ? '✅' : '❌'
-    @failures << [old_version, new_version] unless result
-  end
-end
-
-printable_result = []
-printable_result << [''] + version_list
-version_list.each_with_index do |old_version, old_index|
-  table_line = [old_version]
-  version_list.each_with_index do |new_version, new_index|
-    table_line << if old_index > new_index
-                    ''
-                  else
-                    version_result[old_version + new_version]
-                  end
-  end
-  printable_result << table_line
-end
+  puts 'Started cluster.'
 
-begin
-  require 'terminal-table'
-  puts Terminal::Table.new rows: printable_result, title: 'Migration Test 
Result'
-rescue LoadError
-  puts 'Test summary: ', printable_result
+  result = test_migration old_version, new_version
+  @failures << [old_version, new_version] unless result
 end
-puts "✅ - Compatible, ❌ - Not compatible, ❓ - Target version doesn't support 
`--from-savepoint`"
 
-if @failures.filter { |_, new_version| new_version == version_list.last }.any?
+if @failures.any?
+  puts 'Some migration to snapshot version tests failed. Details: '
+  puts @failures
   abort 'Some migration to snapshot version tests failed.'
 end
diff --git a/tools/mig-test/run_migration_test.rb 
b/tools/mig-test/run_migration_test.rb
index bb111252c..358337762 100644
--- a/tools/mig-test/run_migration_test.rb
+++ b/tools/mig-test/run_migration_test.rb
@@ -37,7 +37,7 @@ def put_mystery_data(mystery)
 end
 
 def ensure_mystery_data(mystery)
-  throw StandardError, 'Failed to get specific mystery string' unless `cat 
#{FLINK_HOME}/log/*.out`.include? mystery
+  raise StandardError, 'Failed to get specific mystery string' unless `cat 
#{FLINK_HOME}/log/*.out`.include? mystery
 end
 
 def extract_job_id(output)
@@ -120,15 +120,14 @@ end
 version_list = case ARGV[0]
                when '1.19.1' then %w[3.2.0 3.2.1 3.3.0 3.4-SNAPSHOT]
                when '1.20.0' then %w[3.2.1 3.3.0 3.4-SNAPSHOT]
-               else []
+               else throw "Unknown Flink target version: #{ARGV[0]}."
                end
 
-version_result = Hash.new('❓')
 @failures = []
 
 new_version = version_list.last
 
-version_list.each_with_index do |old_version, old_index|
+version_list.each do |old_version|
   puts "-> Testing migrating from #{old_version} to latest snapshot."
   puts 'Restarting cluster...'
   `#{FLINK_HOME}/bin/stop-cluster.sh`
@@ -137,7 +136,6 @@ version_list.each_with_index do |old_version, old_index|
   puts 'Started cluster.'
 
   result = test_migration old_version, new_version
-  version_result[old_version + new_version] = result ? '✅' : '❌'
   @failures << "#{old_version} => #{new_version}" unless result
 end
 

Reply via email to