This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 14be15b [Improve](schemaChange)schema change ddl supports
multi-column changes, synchronous defaults (#167)
14be15b is described below
commit 14be15bc140e18337fc927891946050e00230097
Author: DongLiang-0 <[email protected]>
AuthorDate: Thu Aug 3 18:02:59 2023 +0800
[Improve](schemaChange)schema change ddl supports multi-column changes,
synchronous defaults (#167)
---
flink-doris-connector/pom.xml | 2 +-
.../doris/flink/catalog/doris/FieldSchema.java | 16 ++
.../sink/writer/JsonDebeziumSchemaSerializer.java | 181 ++++++++++++++++++++-
.../flink/sink/writer/SchemaChangeHelper.java | 109 +++++++++++++
.../org/apache/doris/flink/tools/cdc/CdcTools.java | 3 +-
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 9 +-
.../apache/doris/flink/CDCSchemaChangeExample.java | 2 +-
.../writer/TestJsonDebeziumSchemaSerializer.java | 66 ++++++++
.../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 3 +-
.../tools/cdc/CdcOraclelSyncDatabaseCase.java | 3 +-
10 files changed, 383 insertions(+), 11 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index b637fef..e10ad27 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -245,7 +245,7 @@ under the License.
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
- <version>2.3.0</version>
+ <version>2.4.1</version>
<scope>provided</scope>
</dependency>
<dependency>
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
index 8255bd3..3c28b74 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.catalog.doris;
public class FieldSchema {
private String name;
private String typeString;
+ private String defaultValue;
private String comment;
public FieldSchema() {
@@ -30,6 +31,13 @@ public class FieldSchema {
this.comment = comment;
}
+ public FieldSchema(String name, String typeString, String defaultValue,
String comment) {
+ this.name = name;
+ this.typeString = typeString;
+ this.defaultValue = defaultValue;
+ this.comment = comment;
+ }
+
public String getName() {
return name;
}
@@ -53,4 +61,12 @@ public class FieldSchema {
public void setComment(String comment) {
this.comment = comment;
}
+
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+
+ public void setDefaultValue(String defaultValue) {
+ this.defaultValue = defaultValue;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index 3329b23..6b66e76 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -23,11 +23,18 @@ import
com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.commons.codec.binary.Base64;
+
+import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.HttpGetWithEntity;
+import org.apache.doris.flink.sink.writer.SchemaChangeHelper.DDLSchema;
+import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
+
+import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.StringUtils;
import org.apache.http.HttpHeaders;
@@ -44,8 +51,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -70,8 +80,12 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private String table;
//table name of the cdc upstream, format is db.tbl
private String sourceTableName;
+ private boolean firstLoad;
+ private boolean firstSchemaChange;
+ private Map<String, FieldSchema> originFieldSchemaMap;
+ private final boolean newSchemaChange;
- public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern
pattern, String sourceTableName) {
+ public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern
pattern, String sourceTableName, boolean newSchemaChange) {
this.dorisOptions = dorisOptions;
this.addDropDDLPattern = pattern == null ?
Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
@@ -82,6 +96,9 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
JsonNodeFactory jsonNodeFactory =
JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper.setNodeFactory(jsonNodeFactory);
+ this.newSchemaChange = newSchemaChange;
+ this.firstLoad = true;
+ this.firstSchemaChange = true;
}
@Override
@@ -91,9 +108,17 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
//schema change ddl
- schemaChange(recordRoot);
+ if (newSchemaChange) {
+ schemaChangeV2(recordRoot);
+ } else {
+ schemaChange(recordRoot);
+ }
return null;
}
+
+ if (newSchemaChange && firstLoad) {
+ initOriginFieldSchema(recordRoot);
+ }
Map<String, String> valueMap;
switch (op) {
case OP_READ:
@@ -113,6 +138,70 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return
objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8);
}
+ public boolean schemaChangeV2(JsonNode recordRoot) {
+ boolean status = false;
+ try {
+ if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) &&
!checkTable(recordRoot)) {
+ return false;
+ }
+ List<String> ddlSqlList = extractDDLList(recordRoot);
+ if (CollectionUtils.isEmpty(ddlSqlList)) {
+ LOG.info("ddl can not do schema change:{}", recordRoot);
+ return false;
+ }
+
+ List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas();
+ for (int i = 0; i < ddlSqlList.size(); i++) {
+ DDLSchema ddlSchema = ddlSchemas.get(i);
+ String ddlSql = ddlSqlList.get(i);
+ boolean doSchemaChange = checkSchemaChange(ddlSchema);
+ status = doSchemaChange && execSchemaChange(ddlSql);
+ LOG.info("schema change status:{}", status);
+ }
+ } catch (Exception ex) {
+ LOG.warn("schema change error :", ex);
+ }
+ return status;
+ }
+
+ private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException,
IllegalArgumentException {
+ String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API,
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
+ Map<String,Object> param = buildRequestParam(ddlSchema);
+ HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
+ httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+ httpGet.setEntity(new
StringEntity(objectMapper.writeValueAsString(param)));
+ boolean success = handleResponse(httpGet);
+ if (!success) {
+ LOG.warn("schema change can not do table {}.{}",database,table);
+ }
+ return success;
+ }
+
+ @VisibleForTesting
+ public List<String> extractDDLList(JsonNode record) throws
JsonProcessingException {
+ JsonNode historyRecord = objectMapper.readTree(extractJsonNode(record,
"historyRecord"));
+ JsonNode tableChanges = historyRecord.get("tableChanges");
+ JsonNode tableChange = tableChanges.get(0);
+ String ddl = extractJsonNode(historyRecord, "ddl");
+ LOG.debug("received debezium ddl :{}", ddl);
+
+ Matcher matcher = addDropDDLPattern.matcher(ddl);
+ if (Objects.isNull(tableChange)||
!tableChange.get("type").asText().equals("ALTER") || !matcher.find()) {
+ return null;
+ }
+
+ JsonNode columns = tableChange.get("table").get("columns");
+ if (firstSchemaChange) {
+ fillOriginSchema(columns);
+ }
+ Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
+ for (JsonNode column : columns) {
+ buildFieldSchema(updateFiledSchema, column);
+ }
+ SchemaChangeHelper.compareSchema(updateFiledSchema,
originFieldSchemaMap);
+ return
SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier());
+ }
+
@VisibleForTesting
public boolean schemaChange(JsonNode recordRoot) {
boolean status = false;
@@ -168,6 +257,13 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return success;
}
+ protected Map<String, Object> buildRequestParam(DDLSchema ddlSchema) {
+ Map<String, Object> params = new HashMap<>();
+ params.put("isDropColumn", ddlSchema.isDropColumn());
+ params.put("columnName", ddlSchema.getColumnName());
+ return params;
+ }
+
/**
* Build param
* {
@@ -233,7 +329,8 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
}
private String extractJsonNode(JsonNode record, String key) {
- return record != null && record.get(key) != null ?
record.get(key).asText() : null;
+ return record != null && record.get(key) != null &&
+ !(record.get(key) instanceof NullNode) ?
record.get(key).asText() : null;
}
private Map<String, String> extractBeforeRow(JsonNode record) {
@@ -277,6 +374,76 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return "Basic " + new
String(Base64.encodeBase64((dorisOptions.getUsername() + ":" +
dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
}
+ @VisibleForTesting
+ public void fillOriginSchema(JsonNode columns) {
+ if (Objects.nonNull(originFieldSchemaMap)) {
+ for (JsonNode column : columns) {
+ String fieldName = column.get("name").asText();
+ if (originFieldSchemaMap.containsKey(fieldName)) {
+ JsonNode length = column.get("length");
+ JsonNode scale = column.get("scale");
+ String type =
MysqlType.toDorisType(column.get("typeName").asText(),
+ length == null ? 0 : length.asInt(),
+ scale == null ? 0 : scale.asInt());
+ String defaultValue =
handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
+ String comment = extractJsonNode(column, "comment");
+ FieldSchema fieldSchema =
originFieldSchemaMap.get(fieldName);
+ fieldSchema.setName(fieldName);
+ fieldSchema.setTypeString(type);
+ fieldSchema.setComment(comment);
+ fieldSchema.setDefaultValue(defaultValue);
+ }
+ }
+ } else {
+ originFieldSchemaMap = new LinkedHashMap<>();
+ columns.forEach(column -> buildFieldSchema(originFieldSchemaMap,
column));
+ }
+ firstSchemaChange = false;
+ firstLoad = false;
+ }
+
+ private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap,
JsonNode column) {
+ String fieldName = column.get("name").asText();
+ JsonNode length = column.get("length");
+ JsonNode scale = column.get("scale");
+ String type = MysqlType.toDorisType(column.get("typeName").asText(),
+ length == null ? 0 : length.asInt(), scale == null ? 0 :
scale.asInt());
+ String defaultValue = handleDefaultValue(extractJsonNode(column,
"defaultValueExpression"));
+ String comment = extractJsonNode(column, "comment");
+ filedSchemaMap.put(fieldName, new FieldSchema(fieldName, type,
defaultValue, comment));
+ }
+
+ private String handleDefaultValue(String defaultValue) {
+ if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
+ return null;
+ }
+ // Due to historical reasons, doris needs to add quotes to the default
value of the new column
+ // For example in mysql: alter table add column c1 int default 100
+ // In Doris: alter table add column c1 int default '100'
+ if (Pattern.matches("['\"].*?['\"]", defaultValue)) {
+ return defaultValue;
+ } else if (defaultValue.equals("1970-01-01 00:00:00")) {
+ // TODO: The default value of setting the current time in CDC is
1970-01-01 00:00:00
+ return "current_timestamp";
+ }
+ return "'" + defaultValue + "'";
+ }
+
+ private void initOriginFieldSchema(JsonNode recordRoot) {
+ originFieldSchemaMap = new LinkedHashMap<>();
+ Set<String> columnNameSet = extractAfterRow(recordRoot).keySet();
+ if (CollectionUtils.isEmpty(columnNameSet)) {
+ columnNameSet = extractBeforeRow(recordRoot).keySet();
+ }
+ columnNameSet.forEach(columnName ->
originFieldSchemaMap.put(columnName, new FieldSchema()));
+ firstLoad = false;
+ }
+
+ @VisibleForTesting
+ public Map<String, FieldSchema> getOriginFieldSchemaMap() {
+ return originFieldSchemaMap;
+ }
+
public static JsonDebeziumSchemaSerializer.Builder builder() {
return new JsonDebeziumSchemaSerializer.Builder();
}
@@ -288,12 +455,18 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private DorisOptions dorisOptions;
private Pattern addDropDDLPattern;
private String sourceTableName;
+ private boolean newSchemaChange;
public JsonDebeziumSchemaSerializer.Builder
setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
return this;
}
+ public JsonDebeziumSchemaSerializer.Builder setNewSchemaChange(boolean
newSchemaChange) {
+ this.newSchemaChange = newSchemaChange;
+ return this;
+ }
+
public JsonDebeziumSchemaSerializer.Builder setPattern(Pattern
addDropDDLPattern) {
this.addDropDDLPattern = addDropDDLPattern;
return this;
@@ -305,7 +478,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
}
public JsonDebeziumSchemaSerializer build() {
- return new JsonDebeziumSchemaSerializer(dorisOptions,
addDropDDLPattern, sourceTableName);
+ return new JsonDebeziumSchemaSerializer(dorisOptions,
addDropDDLPattern, sourceTableName, newSchemaChange);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
new file mode 100644
index 0000000..302770c
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.util.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class SchemaChangeHelper {
+ private static final List<String> dropFieldSchemas = Lists.newArrayList();
+ private static final List<FieldSchema> addFieldSchemas =
Lists.newArrayList();
+ // Used to determine whether the doris table supports ddl
+ private static final List<DDLSchema> ddlSchemas = Lists.newArrayList();
+ public static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
+ public static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+
+ public static void compareSchema(Map<String, FieldSchema>
updateFiledSchemaMap,
+ Map<String, FieldSchema> originFieldSchemaMap) {
+ for (Entry<String, FieldSchema> updateFieldSchema :
updateFiledSchemaMap.entrySet()) {
+ String columName = updateFieldSchema.getKey();
+ if (!originFieldSchemaMap.containsKey(columName)) {
+ addFieldSchemas.add(updateFieldSchema.getValue());
+ originFieldSchemaMap.put(columName,
updateFieldSchema.getValue());
+ }
+ }
+ for (Entry<String, FieldSchema> originFieldSchema :
originFieldSchemaMap.entrySet()) {
+ String columName = originFieldSchema.getKey();
+ if (!updateFiledSchemaMap.containsKey(columName)) {
+ dropFieldSchemas.add(columName);
+ }
+ }
+ if (CollectionUtils.isNotEmpty(dropFieldSchemas)) {
+ dropFieldSchemas.forEach(originFieldSchemaMap::remove);
+ }
+ }
+
+ public static List<String> generateDDLSql(String table) {
+ ddlSchemas.clear();
+ List<String> ddlList = Lists.newArrayList();
+ for (FieldSchema fieldSchema : addFieldSchemas) {
+ String name = fieldSchema.getName();
+ String type = fieldSchema.getTypeString();
+ String defaultValue = fieldSchema.getDefaultValue();
+ String comment = fieldSchema.getComment();
+ String addDDL = String.format(ADD_DDL, table, name, type);
+ if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
+ addDDL = addDDL + " DEFAULT " + defaultValue;
+ }
+ if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+ addDDL = addDDL + " COMMENT " + comment;
+ }
+ ddlList.add(addDDL);
+ ddlSchemas.add(new DDLSchema(name, false));
+ }
+ for (String columName : dropFieldSchemas) {
+ String dropDDL = String.format(DROP_DDL, table, columName);
+ ddlList.add(dropDDL);
+ ddlSchemas.add(new DDLSchema(columName, true));
+ }
+
+ dropFieldSchemas.clear();
+ addFieldSchemas.clear();
+ return ddlList;
+ }
+
+ public static List<DDLSchema> getDdlSchemas() {
+ return ddlSchemas;
+ }
+
+ static class DDLSchema {
+ private final String columnName;
+ private final boolean isDropColumn;
+
+ public DDLSchema(String columnName, boolean isDropColumn) {
+ this.columnName = columnName;
+ this.isDropColumn = isDropColumn;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public boolean isDropColumn() {
+ return isDropColumn;
+ }
+ }
+
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 4a44be9..6d10266 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -79,13 +79,14 @@ public class CdcTools {
String excludingTables = params.get("excluding-tables");
boolean createTableOnly = params.has("create-table-only");
boolean ignoreDefaultValue = params.has("ignore-default-value");
+ boolean useNewSchemaChange = params.has("use-new-schema-change");
Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
Map<String, String> tableMap = getConfigMap(params, "table-conf");
Configuration sinkConfig = Configuration.fromMap(sinkMap);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- databaseSync.create(env, database, config, tablePrefix, tableSuffix,
includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap,
createTableOnly);
+ databaseSync.create(env, database, config, tablePrefix, tableSuffix,
includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap,
createTableOnly, useNewSchemaChange);
databaseSync.build();
if(StringUtils.isNullOrWhitespaceOnly(jobName)){
jobName = String.format("%s-Doris Sync Database: %s", type,
config.getString("database-name","db"));
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 82424c1..9604dcd 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -60,6 +60,7 @@ public abstract class DatabaseSync {
protected boolean ignoreDefaultValue;
public StreamExecutionEnvironment env;
private boolean createTableOnly = false;
+ private boolean newSchemaChange;
public abstract Connection getConnection() throws SQLException;
@@ -70,7 +71,7 @@ public abstract class DatabaseSync {
public void create(StreamExecutionEnvironment env, String database,
Configuration config,
String tablePrefix, String tableSuffix, String
includingTables,
String excludingTables, boolean ignoreDefaultValue,
Configuration sinkConfig,
- Map<String, String> tableConfig, boolean createTableOnly) {
+ Map<String, String> tableConfig, boolean createTableOnly, boolean
useNewSchemaChange) {
this.env = env;
this.config = config;
this.database = database;
@@ -85,6 +86,7 @@ public abstract class DatabaseSync {
this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true");
}
this.createTableOnly = createTableOnly;
+ this.newSchemaChange = useNewSchemaChange;
}
public void build() throws Exception {
@@ -185,7 +187,10 @@ public abstract class DatabaseSync {
}
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
-
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisBuilder.build()).build())
+ .setSerializer(JsonDebeziumSchemaSerializer.builder()
+ .setDorisOptions(dorisBuilder.build())
+ .setNewSchemaChange(newSchemaChange)
+ .build())
.setDorisOptions(dorisBuilder.build());
return builder.build();
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
index 3bad9be..62578d3 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
@@ -78,7 +78,7 @@ public class CDCSchemaChangeExample {
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisOptions)
-
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());
+
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).setNewSchemaChange(true).build());
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL
Source")//.print();
.sinkTo(builder.build());
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index de5a4de..5c2827c 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -20,12 +20,16 @@ package org.apache.doris.flink.sink.writer;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.rest.models.Field;
import org.apache.doris.flink.rest.models.Schema;
+
+import org.apache.commons.collections.CollectionUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -35,8 +39,12 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
/**
* test for JsonDebeziumSchemaSerializer.
@@ -114,6 +122,64 @@ public class TestJsonDebeziumSchemaSerializer {
}
+ @Test
+ public void testExtractDDLListMultipleColumns() throws IOException {
+ String sql0 = "ALTER TABLE test.t1 ADD COLUMN c2 INT";
+ String sql1 = "ALTER TABLE test.t1 ADD COLUMN c555 VARCHAR(300)";
+ String sql2 = "ALTER TABLE test.t1 ADD COLUMN c666 INT DEFAULT '100'";
+ String sql3 = "ALTER TABLE test.t1 ADD COLUMN c4 BIGINT DEFAULT '555'";
+ String sql4 = "ALTER TABLE test.t1 ADD COLUMN c199 INT";
+ String sql5 = "ALTER TABLE test.t1 ADD COLUMN c12 INT DEFAULT '100'";
+ String sql6 = "ALTER TABLE test.t1 DROP COLUMN name";
+ String sql7 = "ALTER TABLE test.t1 DROP COLUMN test_time";
+ String sql8 = "ALTER TABLE test.t1 DROP COLUMN c1";
+ String sql9 = "ALTER TABLE test.t1 DROP COLUMN cc";
+ List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3,
sql4,sql5,sql6,sql7,sql8,sql9);
+
+ String record =
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
[...]
+ JsonNode recordRoot = objectMapper.readTree(record);
+ List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
+ for (int i = 0; i < ddlSQLList.size(); i++) {
+ String srcSQL = srcSqlList.get(i);
+ String targetSQL = ddlSQLList.get(i);
+ Assert.assertEquals(srcSQL, targetSQL);
+ }
+ }
+
+ @Test
+ public void testExtractDDLListRenameColumn() throws IOException {
+ String record =
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
[...]
+ JsonNode recordRoot = objectMapper.readTree(record);
+ List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
+ Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+ }
+
+ @Test
+ public void testFillOriginSchema() throws IOException {
+ Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
+ srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
+ srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)",
null, null));
+ srcFiledSchemaMap.put("test_time", new FieldSchema("test_time",
"DATETIMEV2(0)", null, null));
+ srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'",
null));
+
+ String columnsString =
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"co
[...]
+ JsonNode columns = objectMapper.readTree(columnsString);
+ serializer.fillOriginSchema(columns);
+ Map<String, FieldSchema> originFieldSchemaMap =
serializer.getOriginFieldSchemaMap();
+
+ Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
originFieldSchemaMap.entrySet().iterator();
+ for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
+ FieldSchema srcFiledSchema = entry.getValue();
+ Entry<String, FieldSchema> originField =
originFieldSchemaIterator.next();
+
+ Assert.assertEquals(entry.getKey(), originField.getKey());
+ Assert.assertEquals(srcFiledSchema.getName(),
originField.getValue().getName());
+ Assert.assertEquals(srcFiledSchema.getTypeString(),
originField.getValue().getTypeString());
+ Assert.assertEquals(srcFiledSchema.getDefaultValue(),
originField.getValue().getDefaultValue());
+ Assert.assertEquals(srcFiledSchema.getComment(),
originField.getValue().getComment());
+ }
+ }
+
@Ignore
@Test
public void testSerializeAddColumn() throws IOException, DorisException {
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 4a109c2..c20be39 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -65,8 +65,9 @@ public class CdcMysqlSyncDatabaseCase {
String includingTables = "tbl1|tbl2|tbl3";
String excludingTables = "";
boolean ignoreDefaultValue = false;
+ boolean useNewSchemaChange = false;
DatabaseSync databaseSync = new MysqlDatabaseSync();
-
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
false);
+
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
false, useNewSchemaChange);
databaseSync.build();
env.execute(String.format("MySQL-Doris Database Sync: %s", database));
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index b3b4384..08cf586 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -71,8 +71,9 @@ public class CdcOraclelSyncDatabaseCase {
String includingTables = "test.*";
String excludingTables = "";
boolean ignoreDefaultValue = false;
+ boolean useNewSchemaChange = false;
DatabaseSync databaseSync = new OracleDatabaseSync();
-
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
false);
+
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
false, useNewSchemaChange);
databaseSync.build();
env.execute(String.format("Oracle-Doris Database Sync: %s", database));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]