This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4e4d2b8ee5 [Hotfix][MySQL-CDC] Fix read gbk varchar chinese garbled
characters (#7046)
4e4d2b8ee5 is described below
commit 4e4d2b8ee5fab005886e22e4481e10462d67fa6f
Author: hailin0 <[email protected]>
AuthorDate: Tue Jun 25 13:36:30 2024 +0800
[Hotfix][MySQL-CDC] Fix read gbk varchar chinese garbled characters (#7046)
---
.../config/CustomMySqlConnectionConfiguration.java | 56 ++++++++++++++++++++++
.../reader/fetch/MySqlSourceFetchTaskContext.java | 4 +-
.../cdc/mysql/utils/MySqlConnectionUtils.java | 4 +-
.../seatunnel/cdc/mysql/utils/MySqlUtils.java | 46 ------------------
.../src/test/resources/ddl/mysql_cdc.sql | 4 +-
5 files changed, 63 insertions(+), 51 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/CustomMySqlConnectionConfiguration.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/CustomMySqlConnectionConfiguration.java
new file mode 100644
index 0000000000..4e8c2d1eb8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/CustomMySqlConnectionConfiguration.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnection;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static io.debezium.connector.mysql.MySqlConnectorConfig.JDBC_DRIVER;
+
+public class CustomMySqlConnectionConfiguration
+ extends MySqlConnection.MySqlConnectionConfiguration {
+
+ protected static final String URL_PATTERN =
+
"jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=${connectTimeout}";
+
+ private final JdbcConnection.ConnectionFactory connectionFactory;
+
+ public CustomMySqlConnectionConfiguration(Configuration config) {
+ super(config);
+ String driverClassName =
+ config.getString(JDBC_DRIVER.name(),
JDBC_DRIVER.defaultValueAsString());
+ connectionFactory =
+ JdbcConnection.patternBasedFactory(
+ URL_PATTERN, driverClassName,
getClass().getClassLoader());
+ }
+
+ @Override
+ public JdbcConnection.ConnectionFactory factory() {
+ return new JdbcConnection.ConnectionFactory() {
+ @Override
+ public Connection connect(JdbcConfiguration config) throws
SQLException {
+ return connectionFactory.connect(config);
+ }
+ };
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
index 932fb7ef35..a67bc30dcc 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -30,6 +30,7 @@ import
org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
import org.apache.kafka.connect.data.Struct;
@@ -120,7 +121,8 @@ public class MySqlSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
this.topicSelector =
MySqlTopicSelector.defaultSelector(connectorConfig);
this.databaseSchema =
- MySqlUtils.createMySqlDatabaseSchema(connectorConfig,
tableIdCaseInsensitive);
+ MySqlConnectionUtils.createMySqlDatabaseSchema(
+ connectorConfig, tableIdCaseInsensitive);
this.offsetContext =
loadStartingOffsetState(
new MySqlOffsetContext.Loader(connectorConfig),
sourceSplitBase);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java
index 3a63c5d090..d38553677c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.CustomMySqlConnectionConfiguration;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
@@ -44,8 +45,7 @@ public class MySqlConnectionUtils {
/** Creates a new {@link MySqlConnection}, but not open the connection. */
public static MySqlConnection createMySqlConnection(Configuration
dbzConfiguration) {
- return new MySqlConnection(
- new
MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration));
+ return new MySqlConnection(new
CustomMySqlConnectionConfiguration(dbzConfiguration));
}
/** Creates a new {@link BinaryLogClient} for consuming mysql binlog. */
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
index 032f185d0c..cff223f676 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -24,19 +24,11 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogO
import org.apache.kafka.connect.source.SourceRecord;
-import io.debezium.connector.mysql.MySqlConnectorConfig;
-import io.debezium.connector.mysql.MySqlDatabaseSchema;
-import io.debezium.connector.mysql.MySqlTopicSelector;
-import io.debezium.connector.mysql.MySqlValueConverters;
import io.debezium.jdbc.JdbcConnection;
-import io.debezium.jdbc.JdbcValueConverters;
-import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
-import io.debezium.schema.TopicSelector;
-import io.debezium.util.SchemaNameAdjuster;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
@@ -342,44 +334,6 @@ public class MySqlUtils {
return getSplitType(primaryKeys.get(0), dbzConnectorConfig);
}
- /** Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql
database schemas. */
- public static MySqlDatabaseSchema createMySqlDatabaseSchema(
- MySqlConnectorConfig dbzMySqlConfig, boolean
isTableIdCaseSensitive) {
- TopicSelector<TableId> topicSelector =
MySqlTopicSelector.defaultSelector(dbzMySqlConfig);
- SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
- MySqlValueConverters valueConverters =
getValueConverters(dbzMySqlConfig);
- return new MySqlDatabaseSchema(
- dbzMySqlConfig,
- valueConverters,
- topicSelector,
- schemaNameAdjuster,
- isTableIdCaseSensitive);
- }
-
- private static MySqlValueConverters
getValueConverters(MySqlConnectorConfig dbzMySqlConfig) {
- TemporalPrecisionMode timePrecisionMode =
dbzMySqlConfig.getTemporalPrecisionMode();
- JdbcValueConverters.DecimalMode decimalMode =
dbzMySqlConfig.getDecimalMode();
- String bigIntUnsignedHandlingModeStr =
- dbzMySqlConfig
- .getConfig()
-
.getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
- MySqlConnectorConfig.BigIntUnsignedHandlingMode
bigIntUnsignedHandlingMode =
- MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
- bigIntUnsignedHandlingModeStr);
- JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode =
- bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
-
- boolean timeAdjusterEnabled =
-
dbzMySqlConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
- return new MySqlValueConverters(
- decimalMode,
- timePrecisionMode,
- bigIntUnsignedMode,
- dbzMySqlConfig.binaryHandlingMode(),
- timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x
-> x,
- MySqlValueConverters::defaultParsingErrorHandler);
- }
-
public static BinlogOffset getBinlogPosition(SourceRecord dataRecord) {
return getBinlogPosition(dataRecord.sourceOffset());
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
index b909f9aacd..1103634162 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
@@ -50,7 +50,7 @@ CREATE TABLE mysql_cdc_e2e_source_table
`f_mediumtext` mediumtext,
`f_text` text,
`f_tinytext` tinytext,
- `f_varchar` varchar(100) DEFAULT NULL,
+ `f_varchar` varchar(100) collate gbk_bin DEFAULT NULL,
`f_date` date DEFAULT NULL,
`f_datetime` datetime DEFAULT NULL,
`f_timestamp` timestamp NULL DEFAULT NULL,
@@ -333,7 +333,7 @@ VALUES ( 1,
0x616263740000000000000000000000000000000000000000000000000000000000
0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,
0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456,
654321, 1234567, 7654321, 1234567, 7654321,
123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long
text field', 'This is a medium text field',
- 'This is a text field', 'This is a tiny text field', 'This is a
varchar field', '2022-04-27', '2022-04-27 14:30:00',
+ 'This is a text field', 'This is a tiny text field', '中文测试',
'2022-04-27', '2022-04-27 14:30:00',
'2023-04-27 11:08:40', 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field',
12.345, '14:30:00', -128, 255, '{ "key": "value" }', 2022 ),