This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 1a9e7bc [fix](cdc) fix datetime precision and regular matching format
errors after turning on single-sink (#298)
1a9e7bc is described below
commit 1a9e7bc57ec7723025cad13476d1c569d7527d60
Author: Petrichor <[email protected]>
AuthorDate: Wed Jan 24 17:06:14 2024 +0800
[fix](cdc) fix datetime precision and regular matching format errors after
turning on single-sink (#298)
---
.../doris/flink/catalog/DorisTypeMapper.java | 2 +
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 7 +++-
.../doris/flink/tools/cdc/mysql/MysqlType.java | 40 +++++++++++++++++-
.../TestJsonDebeziumSchemaChangeImplV2.java | 48 ++++++++++++++++++++++
4 files changed, 93 insertions(+), 4 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
index cc5fe4b..e125a30 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -70,6 +70,8 @@ public class DorisTypeMapper {
/** Max size of varchar type of Doris. */
public static final int MAX_VARCHAR_SIZE = 65533;
+ /* Max precision of datetime type of Doris. */
+ public static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6;
public static DataType toFlinkType(
String columnName, String columnType, int precision, int scale) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index e153039..e71ed51 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -80,7 +80,7 @@ public abstract class DatabaseSync {
protected String tablePrefix;
protected String tableSuffix;
protected boolean singleSink;
- private Map<String, String> tableMapping = new HashMap<>();
+ private final Map<String, String> tableMapping = new HashMap<>();
public abstract void registerDriver() throws SQLException;
@@ -93,7 +93,7 @@ public abstract class DatabaseSync {
/** Get the prefix of a specific tableList, for example, mysql is
database, oracle is schema. */
public abstract String getTableListPrefix();
- public DatabaseSync() throws SQLException {
+ protected DatabaseSync() throws SQLException {
registerDriver();
}
@@ -315,6 +315,9 @@ public abstract class DatabaseSync {
.collect(Collectors.joining("|"));
} else {
// includingTablePattern and ^excludingPattern
+ if (includingTables == null) {
+ includingTables = ".*";
+ }
String includingPattern =
String.format("(%s)\\.(%s)", getTableListPrefix(),
includingTables);
if (StringUtils.isNullOrWhitespaceOnly(excludingTables)) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
index 704b8b3..60a5eda 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
@@ -17,11 +17,20 @@
package org.apache.doris.flink.tools.cdc.mysql;
+import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.util.Preconditions;
import org.apache.doris.flink.catalog.doris.DorisType;
+import static
org.apache.doris.flink.catalog.DorisTypeMapper.MAX_SUPPORTED_DATE_TIME_PRECISION;
+
public class MysqlType {
+
+ // MySQL driver returns width of timestamp types instead of precision.
+ // 19 characters are used for zero-precision timestamps while others
+ // require 19 + precision + 1 characters with the additional character
+ // required for the decimal separator.
+ private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19;
private static final String BIT = "BIT";
private static final String BOOLEAN = "BOOLEAN";
private static final String BOOL = "BOOL";
@@ -145,8 +154,35 @@ public class MysqlType {
return DorisType.DATE_V2;
case DATETIME:
case TIMESTAMP:
- int dtScale = length > 19 ? length - 20 : 0;
- return String.format("%s(%s)", DorisType.DATETIME_V2,
Math.min(dtScale, 6));
+ // default precision is 0
+ // see
https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
+ if (length == null
+ || length <= 0
+ || length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) {
+ return String.format("%s(%s)", DorisType.DATETIME_V2, 0);
+ } else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) {
+ // Timestamp with a fraction of seconds.
+ // For example, 2024-01-01 01:01:01.1
+ // The decimal point will occupy 1 character.
+ // Thus,the length of the timestamp is 21.
+ return String.format(
+ "%s(%s)",
+ DorisType.DATETIME_V2,
+ Math.min(
+ length -
ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1,
+ MAX_SUPPORTED_DATE_TIME_PRECISION));
+ } else if (length <= TimestampType.MAX_PRECISION) {
+ // For Debezium JSON data, the timestamp/datetime length
ranges from 0 to 9.
+ return String.format(
+ "%s(%s)",
+ DorisType.DATETIME_V2,
+ Math.min(length,
MAX_SUPPORTED_DATE_TIME_PRECISION));
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported length: "
+ + length
+ + " for MySQL TIMESTAMP/DATETIME types");
+ }
case CHAR:
case VARCHAR:
Preconditions.checkNotNull(length);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 11df3e0..8aca521 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -17,6 +17,7 @@
package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
@@ -267,4 +268,51 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
Assert.assertEquals("age4",
tableSchema.getFields().get("age4").getName());
schemaChange.setSourceConnector(SourceConnector.MYSQL.connectorName);
}
+
+ @Test
+ public void testDateTimeFullOrigin() throws JsonProcessingException {
+ Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
+ srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
+ srcFiledSchemaMap.put(
+ "test_dt_0", new FieldSchema("test_dt_0", "DATETIMEV2(0)",
null, null));
+ srcFiledSchemaMap.put(
+ "test_dt_1", new FieldSchema("test_dt_1", "DATETIMEV2(1)",
null, null));
+ srcFiledSchemaMap.put(
+ "test_dt_3", new FieldSchema("test_dt_3", "DATETIMEV2(3)",
null, null));
+ srcFiledSchemaMap.put(
+ "test_dt_6", new FieldSchema("test_dt_6", "DATETIMEV2(6)",
null, null));
+ srcFiledSchemaMap.put(
+ "test_ts_0", new FieldSchema("test_ts_0", "DATETIMEV2(0)",
null, null));
+ srcFiledSchemaMap.put(
+ "test_ts_1",
+ new FieldSchema("test_ts_1", "DATETIMEV2(1)",
"current_timestamp", null));
+ srcFiledSchemaMap.put(
+ "test_ts_3",
+ new FieldSchema("test_ts_3", "DATETIMEV2(3)",
"current_timestamp", null));
+ srcFiledSchemaMap.put(
+ "test_ts_6",
+ new FieldSchema("test_ts_6", "DATETIMEV2(6)",
"current_timestamp", null));
+
+ schemaChange.setSourceConnector("mysql");
+ String columnsString =
+
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"test_dt_0\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValu
[...]
+ JsonNode columns = objectMapper.readTree(columnsString);
+ schemaChange.fillOriginSchema(columns);
+ Map<String, FieldSchema> originFieldSchemaMap =
schemaChange.getOriginFieldSchemaMap();
+
+ Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
+ originFieldSchemaMap.entrySet().iterator();
+ for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
+ FieldSchema srcFiledSchema = entry.getValue();
+ Entry<String, FieldSchema> originField =
originFieldSchemaIterator.next();
+
+ Assert.assertEquals(entry.getKey(), originField.getKey());
+ Assert.assertEquals(srcFiledSchema.getName(),
originField.getValue().getName());
+ Assert.assertEquals(
+ srcFiledSchema.getTypeString(),
originField.getValue().getTypeString());
+ Assert.assertEquals(
+ srcFiledSchema.getDefaultValue(),
originField.getValue().getDefaultValue());
+ Assert.assertEquals(srcFiledSchema.getComment(),
originField.getValue().getComment());
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]