This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/release-3.2 by this push:
     new 12c36f1cc [FLINK-36656][mysql] Fix type conversion failure for 
newly-added sharding table with mysql boolean type (#3684)
12c36f1cc is described below

commit 12c36f1ccdd80bdcf068f795e6c8943862a674d5
Author: Leonard Xu <[email protected]>
AuthorDate: Tue Nov 19 23:32:28 2024 -0800

    [FLINK-36656][mysql] Fix type conversion failure for newly-added sharding 
table with mysql boolean type (#3684)
---
 .../MySqlDeserializationConverterFactory.java      |  19 ++
 .../mysql/table/MySqlConnectorITCase.java          |  72 -----
 .../table/MySqlConnectorShardingTableITCase.java   | 359 +++++++++++++++++++++
 3 files changed, 378 insertions(+), 72 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlDeserializationConverterFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlDeserializationConverterFactory.java
index ff5f69a06..50eccc176 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlDeserializationConverterFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlDeserializationConverterFactory.java
@@ -53,6 +53,8 @@ public class MySqlDeserializationConverterFactory {
             public Optional<DeserializationRuntimeConverter> 
createUserDefinedConverter(
                     LogicalType logicalType, ZoneId serverTimeZone) {
                 switch (logicalType.getTypeRoot()) {
+                    case TINYINT:
+                        return createTinyIntConverter();
                     case CHAR:
                     case VARCHAR:
                         return createStringConverter();
@@ -148,6 +150,23 @@ public class MySqlDeserializationConverterFactory {
         }
     }
 
+    private static Optional<DeserializationRuntimeConverter> 
createTinyIntConverter() {
+
+        return Optional.of(
+                new DeserializationRuntimeConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(Object dbzObj, Schema schema) throws 
Exception {
+                        if (dbzObj instanceof Boolean) {
+                            return dbzObj == Boolean.TRUE ? (byte) 1 : (byte) 
0;
+                        } else {
+                            return Byte.parseByte(dbzObj.toString());
+                        }
+                    }
+                });
+    }
+
     private static boolean hasFamily(LogicalType logicalType, 
LogicalTypeFamily family) {
         return logicalType.getTypeRoot().getFamilies().contains(family);
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
index c6f66de05..999ae2980 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
@@ -1592,78 +1592,6 @@ public class MySqlConnectorITCase extends 
MySqlSourceTestBase {
         jobClient.cancel().get();
     }
 
-    @Test
-    public void testShardingTablesWithInconsistentSchema() throws Exception {
-        userDatabase1.createAndInitialize();
-        userDatabase2.createAndInitialize();
-        String sourceDDL =
-                String.format(
-                        "CREATE TABLE `user` ("
-                                + " `id` DECIMAL(20, 0) NOT NULL,"
-                                + " name STRING,"
-                                + " address STRING,"
-                                + " phone_number STRING,"
-                                + " email STRING,"
-                                + " age INT,"
-                                + " primary key (`id`) not enforced"
-                                + ") WITH ("
-                                + " 'connector' = 'mysql-cdc',"
-                                + " 'hostname' = '%s',"
-                                + " 'port' = '%s',"
-                                + " 'username' = '%s',"
-                                + " 'password' = '%s',"
-                                + " 'database-name' = '%s',"
-                                + " 'table-name' = '%s',"
-                                + " 'scan.incremental.snapshot.enabled' = 
'%s',"
-                                + " 'server-time-zone' = 'UTC',"
-                                + " 'server-id' = '%s',"
-                                + " 'scan.incremental.snapshot.chunk.size' = 
'%s'"
-                                + ")",
-                        MYSQL_CONTAINER.getHost(),
-                        MYSQL_CONTAINER.getDatabasePort(),
-                        userDatabase1.getUsername(),
-                        userDatabase1.getPassword(),
-                        String.format(
-                                "(%s|%s)",
-                                userDatabase1.getDatabaseName(), 
userDatabase2.getDatabaseName()),
-                        "user_table_.*",
-                        incrementalSnapshot,
-                        getServerId(),
-                        getSplitSize());
-        tEnv.executeSql(sourceDDL);
-
-        // async submit job
-        TableResult result = tEnv.executeSql("SELECT * FROM `user`");
-
-        CloseableIterator<Row> iterator = result.collect();
-        waitForSnapshotStarted(iterator);
-
-        try (Connection connection = userDatabase1.getJdbcConnection();
-                Statement statement = connection.createStatement()) {
-            statement.execute("UPDATE user_table_1_1 SET email = 
'[email protected]' WHERE id=111;");
-        }
-
-        try (Connection connection = userDatabase2.getJdbcConnection();
-                Statement statement = connection.createStatement()) {
-            statement.execute("UPDATE user_table_2_2 SET age = 20 WHERE 
id=221;");
-        }
-
-        String[] expected =
-                new String[] {
-                    "+I[111, user_111, Shanghai, 123567891234, 
[email protected], null]",
-                    "-U[111, user_111, Shanghai, 123567891234, 
[email protected], null]",
-                    "+U[111, user_111, Shanghai, 123567891234, 
[email protected], null]",
-                    "+I[121, user_121, Shanghai, 123567891234, null, null]",
-                    "+I[211, user_211, Shanghai, 123567891234, null, null]",
-                    "+I[221, user_221, Shanghai, 123567891234, null, 18]",
-                    "-U[221, user_221, Shanghai, 123567891234, null, 18]",
-                    "+U[221, user_221, Shanghai, 123567891234, null, 20]",
-                };
-
-        assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, 
expected.length));
-        result.getJobClient().get().cancel().get();
-    }
-
     @Test
     public void testStartupFromSpecificBinlogFilePos() throws Exception {
         inventoryDatabase.createAndInitialize();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorShardingTableITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorShardingTableITCase.java
new file mode 100644
index 000000000..9297465dc
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorShardingTableITCase.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.table;
+
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Stream;
+
+/** Integration tests for MySQL shardding tables. */
+@RunWith(Parameterized.class)
+public class MySqlConnectorShardingTableITCase extends MySqlSourceTestBase {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MySqlConnectorShardingTableITCase.class);
+
+    private static final String TEST_USER = "mysqluser";
+    private static final String TEST_PASSWORD = "mysqlpw";
+
+    private static final MySqlContainer MYSQL8_CONTAINER =
+            createMySqlContainer(MySqlVersion.V8_0, 
"docker/server-gtids/expire-seconds/my.cnf");
+
+    private final UniqueDatabase fullTypesMySql57Database =
+            new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", TEST_USER, 
TEST_PASSWORD);
+
+    private final UniqueDatabase userDatabase1 =
+            new UniqueDatabase(MYSQL_CONTAINER, "user_1", TEST_USER, 
TEST_PASSWORD);
+    private final UniqueDatabase userDatabase2 =
+            new UniqueDatabase(MYSQL_CONTAINER, "user_2", TEST_USER, 
TEST_PASSWORD);
+
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+    private final StreamTableEnvironment tEnv =
+            StreamTableEnvironment.create(
+                    env, 
EnvironmentSettings.newInstance().inStreamingMode().build());
+
+    // enable the incrementalSnapshot (i.e: The new source MySqlParallelSource)
+    private final boolean incrementalSnapshot;
+
+    public MySqlConnectorShardingTableITCase(boolean incrementalSnapshot) {
+        this.incrementalSnapshot = incrementalSnapshot;
+    }
+
+    @Parameterized.Parameters(name = "incrementalSnapshot: {0}")
+    public static Object[] parameters() {
+        return new Object[][] {new Object[] {false}, new Object[] {true}};
+    }
+
+    @BeforeClass
+    public static void beforeClass() {
+        LOG.info("Starting MySql8 containers...");
+        Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
+        LOG.info("Container MySql8 is started.");
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        LOG.info("Stopping MySql8 containers...");
+        MYSQL8_CONTAINER.stop();
+        LOG.info("Container MySql8 is stopped.");
+    }
+
+    @Before
+    public void before() {
+        TestValuesTableFactory.clearAllData();
+        if (incrementalSnapshot) {
+            env.setParallelism(DEFAULT_PARALLELISM);
+            env.enableCheckpointing(200);
+        } else {
+            env.setParallelism(1);
+        }
+    }
+
+    @Test
+    public void testShardingTablesWithTinyInt1() throws Exception {
+        fullTypesMySql57Database.createAndInitialize();
+        try (Connection connection = 
fullTypesMySql57Database.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(String.format("USE %s", 
fullTypesMySql57Database.getDatabaseName()));
+            statement.execute(
+                    "CREATE TABLE sharding_table_1("
+                            + "id BIGINT,"
+                            + "status BOOLEAN," // will be recognized as 
tinyint(1) in debezium as
+                            // it comes from show table command
+                            + " PRIMARY KEY (id) "
+                            + ")");
+            statement.execute("INSERT INTO sharding_table_1 values(1, 
true),(2, false)");
+        }
+
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE sharding_tables (\n"
+                                + "`id` BIGINT,"
+                                + "status TINYINT,"
+                                + "primary key (`id`) not enforced"
+                                + ") WITH ("
+                                + " 'connector' = 'mysql-cdc',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'table-name' = '%s',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'server-id' = '%s',"
+                                + " 'server-time-zone' = 'UTC',"
+                                + " 'scan.incremental.snapshot.chunk.size' = 
'%s'"
+                                + ")",
+                        MYSQL_CONTAINER.getHost(),
+                        MYSQL_CONTAINER.getDatabasePort(),
+                        fullTypesMySql57Database.getUsername(),
+                        fullTypesMySql57Database.getPassword(),
+                        fullTypesMySql57Database.getDatabaseName(),
+                        "sharding_table_.*",
+                        incrementalSnapshot,
+                        getServerId(),
+                        getSplitSize());
+        String sinkDDL =
+                "CREATE TABLE sink ("
+                        + " `id` BIGINT NOT NULL,"
+                        + " status TINYINT,"
+                        + " primary key (`id`) not enforced"
+                        + ") WITH ("
+                        + " 'connector' = 'values',"
+                        + " 'sink-insert-only' = 'false'"
+                        + ")";
+        tEnv.executeSql(sourceDDL);
+        tEnv.executeSql(sinkDDL);
+
+        // async submit job
+        TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM 
sharding_tables");
+
+        try (Connection connection = 
fullTypesMySql57Database.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute("INSERT INTO sharding_table_1 values(3, 
true),(4, false)");
+        }
+        // wait for snapshot finished and begin binlog
+        waitForSinkSize("sink", 4);
+
+        try (Connection connection = 
fullTypesMySql57Database.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(String.format("USE %s", 
fullTypesMySql57Database.getDatabaseName()));
+            statement.execute(
+                    "CREATE TABLE sharding_table_2("
+                            + "id BIGINT,"
+                            + "status BOOLEAN," // will be recognized as 
boolean in debezium as it
+                            // comes from binlog
+                            + " PRIMARY KEY (id) "
+                            + ")");
+            statement.execute("INSERT INTO sharding_table_2 values(5, 
true),(6, false)");
+        }
+
+        waitForSinkSize("sink", 6);
+        String[] expected =
+                new String[] {
+                    "+I[1, 1]", "+I[2, 0]", "+I[3, 1]", "+I[4, 0]", "+I[5, 
1]", "+I[6, 0]",
+                };
+        List<String> actual = TestValuesTableFactory.getResults("sink");
+        assertEqualsInAnyOrder(Arrays.asList(expected), actual);
+        result.getJobClient().get().cancel().get();
+    }
+
+    @Test
+    public void testShardingTablesWithInconsistentSchema() throws Exception {
+        userDatabase1.createAndInitialize();
+        userDatabase2.createAndInitialize();
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE `user` ("
+                                + " `id` DECIMAL(20, 0) NOT NULL,"
+                                + " name STRING,"
+                                + " address STRING,"
+                                + " phone_number STRING,"
+                                + " email STRING,"
+                                + " age INT,"
+                                + " primary key (`id`) not enforced"
+                                + ") WITH ("
+                                + " 'connector' = 'mysql-cdc',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'table-name' = '%s',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'server-time-zone' = 'UTC',"
+                                + " 'server-id' = '%s',"
+                                + " 'scan.incremental.snapshot.chunk.size' = 
'%s'"
+                                + ")",
+                        MYSQL_CONTAINER.getHost(),
+                        MYSQL_CONTAINER.getDatabasePort(),
+                        userDatabase1.getUsername(),
+                        userDatabase1.getPassword(),
+                        String.format(
+                                "(%s|%s)",
+                                userDatabase1.getDatabaseName(), 
userDatabase2.getDatabaseName()),
+                        "user_table_.*",
+                        incrementalSnapshot,
+                        getServerId(),
+                        getSplitSize());
+        tEnv.executeSql(sourceDDL);
+
+        // async submit job
+        TableResult result = tEnv.executeSql("SELECT * FROM `user`");
+
+        CloseableIterator<Row> iterator = result.collect();
+        waitForSnapshotStarted(iterator);
+
+        try (Connection connection = userDatabase1.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute("UPDATE user_table_1_1 SET email = 
'[email protected]' WHERE id=111;");
+        }
+
+        try (Connection connection = userDatabase2.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute("UPDATE user_table_2_2 SET age = 20 WHERE 
id=221;");
+        }
+
+        String[] expected =
+                new String[] {
+                    "+I[111, user_111, Shanghai, 123567891234, 
[email protected], null]",
+                    "-U[111, user_111, Shanghai, 123567891234, 
[email protected], null]",
+                    "+U[111, user_111, Shanghai, 123567891234, 
[email protected], null]",
+                    "+I[121, user_121, Shanghai, 123567891234, null, null]",
+                    "+I[211, user_211, Shanghai, 123567891234, null, null]",
+                    "+I[221, user_221, Shanghai, 123567891234, null, 18]",
+                    "-U[221, user_221, Shanghai, 123567891234, null, 18]",
+                    "+U[221, user_221, Shanghai, 123567891234, null, 20]",
+                };
+
+        assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, 
expected.length));
+        result.getJobClient().get().cancel().get();
+    }
+
+    // 
------------------------------------------------------------------------------------
+
+    private String getServerId() {
+        final Random random = new Random();
+        int serverId = random.nextInt(100) + 5400;
+        if (incrementalSnapshot) {
+            return serverId + "-" + (serverId + env.getParallelism());
+        }
+        return String.valueOf(serverId);
+    }
+
+    protected String getServerId(int base) {
+        if (incrementalSnapshot) {
+            return base + "-" + (base + DEFAULT_PARALLELISM);
+        }
+        return String.valueOf(base);
+    }
+
+    private int getSplitSize() {
+        if (incrementalSnapshot) {
+            // test parallel read
+            return 4;
+        }
+        return 0;
+    }
+
+    private static String buildColumnsDDL(
+            String columnPrefix, int start, int end, String dataType) {
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = start; i < end; i++) {
+            stringBuilder.append(columnPrefix).append(i).append(" 
").append(dataType).append(",");
+        }
+        return stringBuilder.toString();
+    }
+
+    private static String getIntegerSeqString(int start, int end) {
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = start; i < end - 1; i++) {
+            stringBuilder.append(i).append(", ");
+        }
+        stringBuilder.append(end - 1);
+        return stringBuilder.toString();
+    }
+
+    private static void waitForSnapshotStarted(String sinkName) throws 
InterruptedException {
+        while (sinkSize(sinkName) == 0) {
+            Thread.sleep(100);
+        }
+    }
+
+    private static void waitForSinkSize(String sinkName, int expectedSize)
+            throws InterruptedException {
+        while (sinkSize(sinkName) < expectedSize) {
+            Thread.sleep(100);
+        }
+    }
+
+    private static int sinkSize(String sinkName) {
+        synchronized (TestValuesTableFactory.class) {
+            try {
+                return TestValuesTableFactory.getRawResults(sinkName).size();
+            } catch (IllegalArgumentException e) {
+                // job is not started yet
+                return 0;
+            }
+        }
+    }
+
+    private static List<String> fetchRows(Iterator<Row> iter, int size) {
+        List<String> rows = new ArrayList<>(size);
+        while (size > 0 && iter.hasNext()) {
+            Row row = iter.next();
+            rows.add(row.toString());
+            size--;
+        }
+        return rows;
+    }
+
+    private static void waitForSnapshotStarted(CloseableIterator<Row> 
iterator) throws Exception {
+        while (!iterator.hasNext()) {
+            Thread.sleep(100);
+        }
+    }
+}

Reply via email to