This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new ef4feeb [Bug] fix can not create table problem (#252)
ef4feeb is described below
commit ef4feeb17aabf007c6ba763d8a76f33a26baa7a3
Author: wudi <[email protected]>
AuthorDate: Thu Nov 30 19:32:58 2023 +0800
[Bug] fix can not create table problem (#252)
Co-authored-by: wudi <>
---
.../serializer/JsonDebeziumSchemaSerializer.java | 52 ++++++++++++++++------
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 2 +
.../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 4 +-
3 files changed, 41 insertions(+), 17 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 29d9cee..25d06ef 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -93,6 +93,9 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private SchemaChangeManager schemaChangeManager;
// <cdc db.schema.table, doris db.table>
private Map<String, String> tableMapping;
+ // create table properties
+ private Map<String, String> tableProperties;
+ private String targetDatabase;
public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
Pattern pattern,
@@ -134,24 +137,19 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
String sourceTableName,
boolean newSchemaChange,
DorisExecutionOptions executionOptions,
- Map<String, String> tableMapping) {
+ Map<String, String> tableMapping,
+ Map<String, String> tableProperties,
+ String targetDatabase) {
this(dorisOptions, pattern, sourceTableName, newSchemaChange,
executionOptions);
this.tableMapping = tableMapping;
+ this.tableProperties = tableProperties;
+ this.targetDatabase = targetDatabase;
}
@Override
public DorisRecord serialize(String record) throws IOException {
LOG.debug("received debezium json data {} :", record);
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
-
- //Filter out table records that are not in tableMapping
- String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
- String dorisTableIdentifier =
getDorisTableIdentifier(cdcTableIdentifier);
- if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){
- LOG.warn("filter table {}, because it is not listened, record
detail is {}", cdcTableIdentifier, record);
- return null;
- }
-
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
// schema change ddl
@@ -166,6 +164,15 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
if (newSchemaChange && firstLoad) {
initOriginFieldSchema(recordRoot);
}
+
+ //Filter out table records that are not in tableMapping
+ String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+ String dorisTableIdentifier =
getDorisTableIdentifier(cdcTableIdentifier);
+ if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){
+ LOG.warn("filter table {}, because it is not listened, record
detail is {}", cdcTableIdentifier, record);
+ return null;
+ }
+
Map<String, Object> valueMap;
switch (op) {
case OP_READ:
@@ -313,11 +320,16 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
@VisibleForTesting
public TableSchema extractCreateTableSchema(JsonNode record) throws
JsonProcessingException {
+ if(sourceConnector == null){
+ sourceConnector =
SourceConnector.valueOf(record.get("source").get("connector").asText().toUpperCase());
+ }
+
String dorisTable = getCreateTableIdentifier(record);
JsonNode tableChange = extractTableChange(record);
JsonNode pkColumns =
tableChange.get("table").get("primaryKeyColumnNames");
JsonNode columns = tableChange.get("table").get("columns");
- String tblComment = tableChange.get("table").get("comment").asText();
+ JsonNode comment = tableChange.get("table").get("comment");
+ String tblComment = comment == null ? "" : comment.asText();
Map<String, FieldSchema> field = new LinkedHashMap<>();
for (JsonNode column : columns) {
buildFieldSchema(field, column);
@@ -333,6 +345,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
tableSchema.setKeys(pkList);
tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
tableSchema.setTableComment(tblComment);
+ tableSchema.setProperties(tableProperties);
String[] split = dorisTable.split("\\.");
Preconditions.checkArgument(split.length == 2);
@@ -402,9 +415,8 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
}
public String getCreateTableIdentifier(JsonNode record){
- String db = extractJsonNode(record.get("source"), "db");
String table = extractJsonNode(record.get("source"), "table");
- return db + "." + table;
+ return targetDatabase + "." + table;
}
public String getDorisTableIdentifier(String cdcTableIdentifier){
@@ -657,6 +669,8 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private boolean newSchemaChange;
private DorisExecutionOptions executionOptions;
private Map<String, String> tableMapping;
+ private Map<String, String> tableProperties;
+ private String targetDatabase;
public JsonDebeziumSchemaSerializer.Builder
setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
@@ -688,9 +702,19 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return this;
}
+ public Builder setTableProperties(Map<String, String> tableProperties)
{
+ this.tableProperties = tableProperties;
+ return this;
+ }
+
+ public Builder setTargetDatabase(String targetDatabase) {
+ this.targetDatabase = targetDatabase;
+ return this;
+ }
+
public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(dorisOptions,
addDropDDLPattern, sourceTableName, newSchemaChange,
- executionOptions, tableMapping);
+ executionOptions, tableMapping, tableProperties,
targetDatabase);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 02ab034..fe5357e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -242,6 +242,8 @@ public abstract class DatabaseSync {
.setNewSchemaChange(newSchemaChange)
.setExecutionOptions(executionOptions)
.setTableMapping(tableMapping)
+ .setTableProperties(tableConfig)
+ .setTargetDatabase(database)
.build())
.setDorisOptions(dorisBuilder.build());
return builder.build();
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index a3e01d3..a86b2ab 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -122,9 +122,7 @@ public class MysqlDatabaseSync extends DatabaseSync {
.username(config.get(MySqlSourceOptions.USERNAME))
.password(config.get(MySqlSourceOptions.PASSWORD))
.databaseList(databaseName)
- .tableList(tableName)
- //default open add newly table
- .scanNewlyAddedTableEnabled(true);
+ .tableList(tableName);
config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
config
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]