This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 78ed055 [Improve](schemaChange)schema change support rename column
(#206)
78ed055 is described below
commit 78ed055fd2e7f7657dadfb1d3f4f4dd9e9caadf7
Author: DongLiang-0 <[email protected]>
AuthorDate: Fri Oct 27 16:02:32 2023 +0800
[Improve](schemaChange)schema change support rename column (#206)
---
.../sink/writer/JsonDebeziumSchemaSerializer.java | 20 ++++++-
.../flink/sink/writer/SchemaChangeHelper.java | 23 +++++++-
.../flink/sink/writer/SchemaChangeHelperTest.java | 65 ++++++++++++++++++++++
.../writer/TestJsonDebeziumSchemaSerializer.java | 25 +++++++++
4 files changed, 130 insertions(+), 3 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index fd3c92a..bf7b81f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -82,6 +82,8 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s";
// alter table tbl add cloumn aca int
private static final String addDropDDLRegex
=
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
+ private static final Pattern renameDDLPattern = Pattern.compile(
+
"ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)",
Pattern.CASE_INSENSITIVE);
private final Pattern addDropDDLPattern;
private DorisOptions dorisOptions;
private ObjectMapper objectMapper = new ObjectMapper();
@@ -249,12 +251,24 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
sourceConnector =
SourceConnector.valueOf(record.get("source").get("connector").asText().toUpperCase());
fillOriginSchema(columns);
}
+
+ // rename ddl
+ Matcher renameMatcher = renameDDLPattern.matcher(ddl);
+ if (renameMatcher.find()) {
+ String oldColumnName = renameMatcher.group(2);
+ String newColumnName = renameMatcher.group(3);
+ return SchemaChangeHelper.generateRenameDDLSql(
+ dorisOptions.getTableIdentifier(), oldColumnName,
newColumnName, originFieldSchemaMap);
+ }
+
+ // add/drop ddl
Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
for (JsonNode column : columns) {
buildFieldSchema(updateFiledSchema, column);
}
SchemaChangeHelper.compareSchema(updateFiledSchema,
originFieldSchemaMap);
- // In order to avoid operations such as rename or change, which may
lead to the accidental deletion of the doris column.
+ // In order to avoid other source table column change operations other
than add/drop/rename,
+ // which may lead to the accidental deletion of the doris column.
Matcher matcher = addDropDDLPattern.matcher(ddl);
if (!matcher.find()) {
return null;
@@ -262,6 +276,10 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return
SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier());
}
+ @VisibleForTesting
+ public void setOriginFieldSchemaMap(Map<String, FieldSchema>
originFieldSchemaMap) {
+ this.originFieldSchemaMap = originFieldSchemaMap;
+ }
@VisibleForTesting
public boolean schemaChange(JsonNode recordRoot) {
boolean status = false;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
index dc8d83b..8e6307b 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
@@ -32,8 +32,9 @@ public class SchemaChangeHelper {
private static final List<FieldSchema> addFieldSchemas =
Lists.newArrayList();
// Used to determine whether the doris table supports ddl
private static final List<DDLSchema> ddlSchemas = Lists.newArrayList();
- public static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
- public static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+ private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
+ private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+ private static final String RENAME_DDL = "ALTER TABLE %s RENAME COLUMN %s
%s";
public static void compareSchema(Map<String, FieldSchema>
updateFiledSchemaMap,
Map<String, FieldSchema> originFieldSchemaMap) {
@@ -57,6 +58,24 @@ public class SchemaChangeHelper {
}
}
+ public static List<String> generateRenameDDLSql(String table, String
oldColumnName, String newColumnName,
+ Map<String, FieldSchema> originFieldSchemaMap) {
+ ddlSchemas.clear();
+ List<String> ddlList = Lists.newArrayList();
+ FieldSchema fieldSchema = null;
+ for (Entry<String, FieldSchema> originFieldSchema :
originFieldSchemaMap.entrySet()) {
+ if (originFieldSchema.getKey().equals(oldColumnName)) {
+ fieldSchema = originFieldSchema.getValue();
+ String renameSQL = String.format(RENAME_DDL, table,
oldColumnName, newColumnName);
+ ddlList.add(renameSQL);
+ ddlSchemas.add(new DDLSchema(oldColumnName, false));
+ }
+ }
+ originFieldSchemaMap.remove(oldColumnName);
+ originFieldSchemaMap.put(newColumnName, fieldSchema);
+ return ddlList;
+ }
+
public static List<String> generateDDLSql(String table) {
ddlSchemas.clear();
List<String> ddlList = Lists.newArrayList();
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java
new file mode 100644
index 0000000..62906df
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class SchemaChangeHelperTest {
+
+ private final Map<String, FieldSchema> originFieldSchemaMap =
Maps.newHashMap();
+ private final Map<String, FieldSchema> updateFieldSchemaMap =
Maps.newHashMap();
+
+ @Before
+ public void setUp() {
+ originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
+ originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
+ originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)",
"", ""));
+
+ updateFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
+ updateFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
+ updateFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)",
"", ""));
+ updateFieldSchemaMap.put("c4", new FieldSchema("c4", "BIGINT", "",
""));
+ updateFieldSchemaMap.put("c5", new FieldSchema("c5", "DATETIMEV2(0)",
"", ""));
+ }
+
+ @Test
+ public void testGenerateRenameDDLSql() {
+ String table = "test.test_sink";
+ String oldColumnName = "c3";
+ String newColumnName = "c33";
+ List<String> ddlSqls = SchemaChangeHelper.generateRenameDDLSql(table,
oldColumnName, newColumnName,
+ originFieldSchemaMap);
+ Assert.assertEquals(ddlSqls.get(0), "ALTER TABLE test.test_sink RENAME
COLUMN c3 c33");
+ }
+
+ @Test
+ public void testGenerateDDLSql() {
+ SchemaChangeHelper.compareSchema(updateFieldSchemaMap,
originFieldSchemaMap);
+ List<String> ddlSqls =
SchemaChangeHelper.generateDDLSql("test.test_sink");
+ Assert.assertEquals(ddlSqls.get(0), "ALTER TABLE test.test_sink ADD
COLUMN c4 BIGINT");
+ Assert.assertEquals(ddlSqls.get(1), "ALTER TABLE test.test_sink ADD
COLUMN c5 DATETIMEV2(0)");
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index c4bdbc0..59bfe44 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -31,6 +31,7 @@ import org.apache.doris.flink.rest.models.Field;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -286,6 +287,30 @@ public class TestJsonDebeziumSchemaSerializer {
Assert.assertEquals(dorisTypeName, "VARCHAR(384)");
}
+ @Test
+ public void testExtractDDLListRename() throws IOException {
+ String columnInfo
+ =
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1698314781975,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000046\",\"pos\":5197,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5197,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_se
[...]
+ Map<String, FieldSchema> originFieldSchemaMap = Maps.newHashMap();
+ JsonNode record = objectMapper.readTree(columnInfo);
+
+ DorisOptions dorisOptions =
DorisOptions.builder().setFenodes("127.0.0.1:8030")
+ .setTableIdentifier("test.t1")
+ .setUsername("root")
+ .setPassword("").build();
+ JsonDebeziumSchemaSerializer serializer =
JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions)
+ .build();
+ serializer.setSourceConnector("mysql");
+
+ originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
+ originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
+ originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)",
"", ""));
+ serializer.setOriginFieldSchemaMap(originFieldSchemaMap);
+
+ List<String> ddlList = serializer.extractDDLList(record);
+ Assert.assertEquals("ALTER TABLE test.t1 RENAME COLUMN c3 c333",
ddlList.get(0));
+ }
+
@Ignore
@Test
public void testSerializeAddColumn() throws IOException, DorisException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]