This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new c77f9d7 [fix](cdc) single sink add tableprefix and tablesuffix (#301)
c77f9d7 is described below
commit c77f9d7a23c55d2451cbff99f9c941491de35e78
Author: wudi <[email protected]>
AuthorDate: Tue Jan 23 14:41:28 2024 +0800
[fix](cdc) single sink add tableprefix and tablesuffix (#301)
Currently, when single-sink is enabled and CDC automatically creates a
table, it cannot automatically obtain the suffix and suffix.
---
.../serializer/JsonDebeziumSchemaSerializer.java | 34 +++++++++++++++++++---
.../jsondebezium/JsonDebeziumChangeContext.java | 16 +++++++++-
.../JsonDebeziumSchemaChangeImplV2.java | 12 +++++++-
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 2 ++
.../jsondebezium/TestJsonDebeziumDataChange.java | 8 +++--
.../TestJsonDebeziumSchemaChangeImpl.java | 4 ++-
.../TestJsonDebeziumSchemaChangeImplV2.java | 4 ++-
.../doris/flink/tools/cdc/MySQLDorisE2ECase.java | 2 +-
8 files changed, 71 insertions(+), 11 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 370bca7..5bea2d9 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -18,6 +18,7 @@
package org.apache.doris.flink.sink.writer.serializer;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
@@ -66,6 +67,8 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
// create table properties
private Map<String, String> tableProperties;
private String targetDatabase;
+ private String targetTablePrefix;
+ private String targetTableSuffix;
private JsonDebeziumDataChange dataChange;
private JsonDebeziumSchemaChange schemaChange;
@@ -109,11 +112,15 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
DorisExecutionOptions executionOptions,
Map<String, String> tableMapping,
Map<String, String> tableProperties,
- String targetDatabase) {
+ String targetDatabase,
+ String targetTablePrefix,
+ String targetTableSuffix) {
this(dorisOptions, pattern, sourceTableName, newSchemaChange,
executionOptions);
this.tableMapping = tableMapping;
this.tableProperties = tableProperties;
this.targetDatabase = targetDatabase;
+ this.targetTablePrefix = targetTablePrefix;
+ this.targetTableSuffix = targetTableSuffix;
init();
}
@@ -128,8 +135,9 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
objectMapper,
pattern,
lineDelimiter,
- ignoreUpdateBefore);
-
+ ignoreUpdateBefore,
+ targetTablePrefix,
+ targetTableSuffix);
this.schemaChange =
newSchemaChange
? new JsonDebeziumSchemaChangeImplV2(changeContext)
@@ -180,6 +188,8 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private Map<String, String> tableMapping;
private Map<String, String> tableProperties;
private String targetDatabase;
+ private String targetTablePrefix = "";
+ private String targetTableSuffix = "";
public JsonDebeziumSchemaSerializer.Builder
setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
@@ -221,6 +231,20 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return this;
}
+ public Builder setTargetTablePrefix(String tablePrefix) {
+ if (!StringUtils.isNullOrWhitespaceOnly(tablePrefix)) {
+ this.targetTablePrefix = tablePrefix;
+ }
+ return this;
+ }
+
+ public Builder setTargetTableSuffix(String tableSuffix) {
+ if (!StringUtils.isNullOrWhitespaceOnly(tableSuffix)) {
+ this.targetTableSuffix = tableSuffix;
+ }
+ return this;
+ }
+
public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(
dorisOptions,
@@ -230,7 +254,9 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
executionOptions,
tableMapping,
tableProperties,
- targetDatabase);
+ targetDatabase,
+ targetTablePrefix,
+ targetTableSuffix);
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
index 9c59f14..a7253d2 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
@@ -38,6 +38,8 @@ public class JsonDebeziumChangeContext implements
Serializable {
private final Pattern pattern;
private final String lineDelimiter;
private final boolean ignoreUpdateBefore;
+ private String targetTablePrefix;
+ private String targetTableSuffix;
public JsonDebeziumChangeContext(
DorisOptions dorisOptions,
@@ -48,7 +50,9 @@ public class JsonDebeziumChangeContext implements
Serializable {
ObjectMapper objectMapper,
Pattern pattern,
String lineDelimiter,
- boolean ignoreUpdateBefore) {
+ boolean ignoreUpdateBefore,
+ String targetTablePrefix,
+ String targetTableSuffix) {
this.dorisOptions = dorisOptions;
this.tableMapping = tableMapping;
this.sourceTableName = sourceTableName;
@@ -58,6 +62,8 @@ public class JsonDebeziumChangeContext implements
Serializable {
this.pattern = pattern;
this.lineDelimiter = lineDelimiter;
this.ignoreUpdateBefore = ignoreUpdateBefore;
+ this.targetTablePrefix = targetTablePrefix;
+ this.targetTableSuffix = targetTableSuffix;
}
public DorisOptions getDorisOptions() {
@@ -95,4 +101,12 @@ public class JsonDebeziumChangeContext implements
Serializable {
public boolean isIgnoreUpdateBefore() {
return ignoreUpdateBefore;
}
+
+ public String getTargetTablePrefix() {
+ return targetTablePrefix;
+ }
+
+ public String getTargetTableSuffix() {
+ return targetTableSuffix;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 5604da7..b3a90e6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -70,6 +70,8 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
// create table properties
private final Map<String, String> tableProperties;
private String targetDatabase;
+ private String targetTablePrefix;
+ private String targetTableSuffix;
public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext
changeContext) {
this.addDropDDLPattern = Pattern.compile(addDropDDLRegex,
Pattern.CASE_INSENSITIVE);
@@ -81,6 +83,14 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
this.tableProperties = changeContext.getTableProperties();
this.tableMapping = changeContext.getTableMapping();
this.objectMapper = changeContext.getObjectMapper();
+ this.targetTablePrefix =
+ changeContext.getTargetTablePrefix() == null
+ ? ""
+ : changeContext.getTargetTablePrefix();
+ this.targetTableSuffix =
+ changeContext.getTargetTableSuffix() == null
+ ? ""
+ : changeContext.getTargetTableSuffix();
}
@Override
@@ -253,7 +263,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
private String getCreateTableIdentifier(JsonNode record) {
String table = extractJsonNode(record.get("source"), "table");
- return targetDatabase + "." + table;
+ return targetDatabase + "." + targetTablePrefix + table +
targetTableSuffix;
}
private boolean checkSchemaChange(String database, String table, DDLSchema
ddlSchema)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index b6bb5e5..e153039 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -288,6 +288,8 @@ public abstract class DatabaseSync {
.setTableMapping(tableMapping)
.setTableProperties(tableConfig)
.setTargetDatabase(database)
+ .setTargetTablePrefix(tablePrefix)
+ .setTargetTableSuffix(tableSuffix)
.build())
.setDorisOptions(dorisBuilder.build());
return builder.build();
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
index d789807..8900339 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
@@ -48,7 +48,9 @@ public class TestJsonDebeziumDataChange extends
TestJsonDebeziumChangeBase {
objectMapper,
null,
lineDelimiter,
- ignoreUpdateBefore);
+ ignoreUpdateBefore,
+ "",
+ "");
dataChange = new JsonDebeziumDataChange(changeContext);
}
@@ -109,7 +111,9 @@ public class TestJsonDebeziumDataChange extends
TestJsonDebeziumChangeBase {
objectMapper,
null,
lineDelimiter,
- false);
+ false,
+ "",
+ "");
dataChange = new JsonDebeziumDataChange(changeContext);
// update t1 set name='doris-update' WHERE id =1;
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
index 9b003a8..c8997a1 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
@@ -55,7 +55,9 @@ public class TestJsonDebeziumSchemaChangeImpl extends
TestJsonDebeziumChangeBase
objectMapper,
null,
lineDelimiter,
- ignoreUpdateBefore);
+ ignoreUpdateBefore,
+ "",
+ "");
schemaChange = new JsonDebeziumSchemaChangeImpl(changeContext);
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index c63267d..0ce60d3 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -58,7 +58,9 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
objectMapper,
null,
lineDelimiter,
- ignoreUpdateBefore);
+ ignoreUpdateBefore,
+ "",
+ "");
schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext);
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index 242f93f..3390f75 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -72,7 +72,7 @@ public class MySQLDorisE2ECase extends DorisTestBase {
private static final String TABLE_4 = "tbl4";
private static final MySQLContainer MYSQL_CONTAINER =
- new MySQLContainer("mysql")
+ new MySQLContainer("mysql:8.0")
.withDatabaseName(DATABASE)
.withUsername(MYSQL_USER)
.withPassword(MYSQL_PASSWD);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]