This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 4038842 [Improve](mysqlSync)Add the configuration of whether to
synchronize the default value (#152)
4038842 is described below
commit 403884239c9e7de58deef82bc26599d07f68e0d2
Author: DongLiang-0 <[email protected]>
AuthorDate: Fri Jul 21 15:07:51 2023 +0800
[Improve](mysqlSync)Add the configuration of whether to synchronize the
default value (#152)
---
.../DorisJsonDebeziumDeserializationSchema.java | 185 +++++++++++++++++++++
.../sink/writer/JsonDebeziumSchemaSerializer.java | 2 +-
.../org/apache/doris/flink/tools/cdc/CdcTools.java | 3 +-
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 5 +-
.../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 16 +-
.../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 3 +-
.../tools/cdc/CdcOraclelSyncDatabaseCase.java | 3 +-
7 files changed, 207 insertions(+), 10 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java
new file mode 100644
index 0000000..9c54ade
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.deserialization;
+
+import org.apache.doris.flink.exception.DorisException;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
+import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
+import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
+import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Currently just use for synchronous mysql non-default.
+ */
+public class DorisJsonDebeziumDeserializationSchema implements
DebeziumDeserializationSchema<String> {
+
+ private static final JsonNodeFactory JSON_NODE_FACTORY =
JsonNodeFactory.withExactBigDecimals(true);
+ private final ObjectMapper objectMapper;
+
+ public DorisJsonDebeziumDeserializationSchema() {
+ objectMapper = new ObjectMapper();
+ }
+
+ @Override
+ public void deserialize(SourceRecord sourceRecord, Collector<String>
collector) throws Exception {
+ Schema schema = sourceRecord.valueSchema();
+ Object value = sourceRecord.value();
+ JsonNode jsonValue = convertToJson(schema, value);
+ byte[] bytes =
objectMapper.writeValueAsString(jsonValue).getBytes(StandardCharsets.UTF_8);
+ collector.collect(new String(bytes));
+ }
+
+ private JsonNode convertToJson(Schema schema, Object value) throws
DorisException {
+ if (value == null) {
+ if (schema == null) // Any schema is valid and we don't have a
default, so treat this as an optional schema
+ {
+ return null;
+ }
+ if (schema.isOptional()) {
+ return JSON_NODE_FACTORY.nullNode();
+ }
+ throw new DorisException(
+ "Conversion error: null value for field that is required
and has no default value");
+ }
+
+ try {
+ final Schema.Type schemaType;
+ if (schema == null) {
+ schemaType = ConnectSchema.schemaType(value.getClass());
+ if (schemaType == null) {
+ throw new DorisException(
+ "Java class " + value.getClass() + " does not have
corresponding schema type.");
+ }
+ } else {
+ schemaType = schema.type();
+ }
+ switch (schemaType) {
+ case INT8:
+ return JSON_NODE_FACTORY.numberNode((Byte) value);
+ case INT16:
+ return JSON_NODE_FACTORY.numberNode((Short) value);
+ case INT32:
+ return JSON_NODE_FACTORY.numberNode((Integer) value);
+ case INT64:
+ return JSON_NODE_FACTORY.numberNode((Long) value);
+ case FLOAT32:
+ return JSON_NODE_FACTORY.numberNode((Float) value);
+ case FLOAT64:
+ return JSON_NODE_FACTORY.numberNode((Double) value);
+ case BOOLEAN:
+ return JSON_NODE_FACTORY.booleanNode((Boolean) value);
+ case STRING:
+ CharSequence charSeq = (CharSequence) value;
+ return JSON_NODE_FACTORY.textNode(charSeq.toString());
+ case BYTES:
+ if (value instanceof byte[]) {
+ return JSON_NODE_FACTORY.binaryNode((byte[]) value);
+ } else if (value instanceof ByteBuffer) {
+ return JSON_NODE_FACTORY.binaryNode(((ByteBuffer)
value).array());
+ } else if (value instanceof BigDecimal) {
+ return JSON_NODE_FACTORY.numberNode((BigDecimal)
value);
+ } else {
+ throw new DorisException("Invalid type for bytes type:
" + value.getClass());
+ }
+ case ARRAY: {
+ Collection<?> collection = (Collection<?>) value;
+ ArrayNode list = JSON_NODE_FACTORY.arrayNode();
+ for (Object elem : collection) {
+ Schema valueSchema = schema == null ? null :
schema.valueSchema();
+ JsonNode fieldValue = convertToJson(valueSchema, elem);
+ list.add(fieldValue);
+ }
+ return list;
+ }
+ case MAP: {
+ Map<?, ?> map = (Map<?, ?>) value;
+ // If true, using string keys and JSON object; if false,
using non-string keys and Array-encoding
+ boolean objectMode;
+ if (schema == null) {
+ objectMode = true;
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ if (!(entry.getKey() instanceof String)) {
+ objectMode = false;
+ break;
+ }
+ }
+ } else {
+ objectMode = schema.keySchema().type() ==
Schema.Type.STRING;
+ }
+ ObjectNode obj = null;
+ ArrayNode list = null;
+ if (objectMode) {
+ obj = JSON_NODE_FACTORY.objectNode();
+ } else {
+ list = JSON_NODE_FACTORY.arrayNode();
+ }
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ Schema keySchema = schema == null ? null :
schema.keySchema();
+ Schema valueSchema = schema == null ? null :
schema.valueSchema();
+ JsonNode mapKey = convertToJson(keySchema,
entry.getKey());
+ JsonNode mapValue = convertToJson(valueSchema,
entry.getValue());
+
+ if (objectMode) {
+ obj.set(mapKey.asText(), mapValue);
+ } else {
+
list.add(JSON_NODE_FACTORY.arrayNode().add(mapKey).add(mapValue));
+ }
+ }
+ return objectMode ? obj : list;
+ }
+ case STRUCT: {
+ Struct struct = (Struct) value;
+ if (!struct.schema().equals(schema)) {
+ throw new DorisException("Mismatching schema.");
+ }
+ ObjectNode obj = JSON_NODE_FACTORY.objectNode();
+ for (Field field : schema.fields()) {
+ obj.set(field.name(), convertToJson(field.schema(),
struct.getWithoutDefault(field.name())));
+ }
+ return obj;
+ }
+ }
+ throw new DorisException("Couldn't convert " + value + " to
JSON.");
+ } catch (ClassCastException e) {
+ String schemaTypeStr = (schema != null) ? schema.type().toString()
: "unknown schema";
+ throw new DorisException("Invalid type for " + schemaTypeStr + ":
" + value.getClass());
+ }
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index c7295e2..3329b23 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -327,4 +327,4 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
}
-}
+}
\ No newline at end of file
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 85790d9..4a44be9 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -78,13 +78,14 @@ public class CdcTools {
String includingTables = params.get("including-tables");
String excludingTables = params.get("excluding-tables");
boolean createTableOnly = params.has("create-table-only");
+ boolean ignoreDefaultValue = params.has("ignore-default-value");
Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
Map<String, String> tableMap = getConfigMap(params, "table-conf");
Configuration sinkConfig = Configuration.fromMap(sinkMap);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- databaseSync.create(env, database, config, tablePrefix, tableSuffix,
includingTables, excludingTables, sinkConfig, tableMap, createTableOnly);
+ databaseSync.create(env, database, config, tablePrefix, tableSuffix,
includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap,
createTableOnly);
databaseSync.build();
if(StringUtils.isNullOrWhitespaceOnly(jobName)){
jobName = String.format("%s-Doris Sync Database: %s", type,
config.getString("database-name","db"));
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 799eff1..82424c1 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -57,6 +57,7 @@ public abstract class DatabaseSync {
protected Pattern excludingPattern;
protected Map<String, String> tableConfig;
protected Configuration sinkConfig;
+ protected boolean ignoreDefaultValue;
public StreamExecutionEnvironment env;
private boolean createTableOnly = false;
@@ -68,13 +69,15 @@ public abstract class DatabaseSync {
public void create(StreamExecutionEnvironment env, String database,
Configuration config,
String tablePrefix, String tableSuffix, String
includingTables,
- String excludingTables, Configuration sinkConfig,
Map<String, String> tableConfig, boolean createTableOnly) {
+ String excludingTables, boolean ignoreDefaultValue,
Configuration sinkConfig,
+ Map<String, String> tableConfig, boolean createTableOnly) {
this.env = env;
this.config = config;
this.database = database;
this.converter = new TableNameConverter(tablePrefix, tableSuffix);
this.includingPattern = includingTables == null ? null :
Pattern.compile(includingTables);
this.excludingPattern = excludingTables == null ? null :
Pattern.compile(excludingTables);
+ this.ignoreDefaultValue = ignoreDefaultValue;
this.sinkConfig = sinkConfig;
this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig;
//default enable light schema change
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index 4d6d250..ac047e4 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -23,8 +23,11 @@ import
com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
+
+import
org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.DateToStringConverter;
import org.apache.doris.flink.tools.cdc.SourceSchema;
@@ -178,11 +181,14 @@ public class MysqlDatabaseSync extends DatabaseSync {
}
sourceBuilder.jdbcProperties(jdbcProperties);
sourceBuilder.debeziumProperties(debeziumProperties);
-
- Map<String, Object> customConverterConfigs = new HashMap<>();
- customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
- JsonDebeziumDeserializationSchema schema =
- new JsonDebeziumDeserializationSchema(false,
customConverterConfigs);
+ DebeziumDeserializationSchema<String> schema;
+ if (ignoreDefaultValue) {
+ schema = new DorisJsonDebeziumDeserializationSchema();
+ } else {
+ Map<String, Object> customConverterConfigs = new HashMap<>();
+
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
+ schema = new JsonDebeziumDeserializationSchema(false,
customConverterConfigs);
+ }
MySqlSource<String> mySqlSource =
sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
DataStreamSource<String> streamSource = env.fromSource(
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index c940477..4a109c2 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -64,8 +64,9 @@ public class CdcMysqlSyncDatabaseCase {
String includingTables = "tbl1|tbl2|tbl3";
String excludingTables = "";
+ boolean ignoreDefaultValue = false;
DatabaseSync databaseSync = new MysqlDatabaseSync();
-
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig,
false);
+
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
false);
databaseSync.build();
env.execute(String.format("MySQL-Doris Database Sync: %s", database));
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index 23610ab..b3b4384 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -70,8 +70,9 @@ public class CdcOraclelSyncDatabaseCase {
String includingTables = "test.*";
String excludingTables = "";
+ boolean ignoreDefaultValue = false;
DatabaseSync databaseSync = new OracleDatabaseSync();
-
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig,
false);
+
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
false);
databaseSync.build();
env.execute(String.format("Oracle-Doris Database Sync: %s", database));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]