This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new ed4a4c39 [Fix](cdc) Ignore when mongodb schema change fails (#537)
ed4a4c39 is described below
commit ed4a4c39a6fcd6e287df91c593681c733609765d
Author: wudi <[email protected]>
AuthorDate: Fri Jan 10 12:08:31 2025 +0800
[Fix](cdc) Ignore when mongodb schema change fails (#537)
---
.../serializer/jsondebezium/CdcSchemaChange.java | 5 +--
.../MongoDBJsonDebeziumSchemaSerializer.java | 7 +---
.../serializer/MongoJsonDebeziumSchemaChange.java | 44 ++++++++++++----------
3 files changed, 29 insertions(+), 27 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java
index 858a5eff..ac00017d 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java
@@ -20,8 +20,6 @@ package
org.apache.doris.flink.sink.writer.serializer.jsondebezium;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.doris.flink.sink.writer.ChangeEvent;
-import java.io.IOException;
-
/**
* When cdc connector captures data changes about source database schema
changes, you need to
* inherit this class to complete the synchronized changes to Doris schema.
Supports data messages
@@ -33,7 +31,8 @@ public abstract class CdcSchemaChange implements ChangeEvent {
protected abstract String extractTable(JsonNode record);
- public abstract boolean schemaChange(JsonNode recordRoot) throws
IOException;
+ /** Schema change */
+ public abstract boolean schemaChange(JsonNode recordRoot);
protected abstract String getCdcTableIdentifier(JsonNode record);
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
index d4a87ff8..296a3727 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
@@ -120,11 +120,8 @@ public class MongoDBJsonDebeziumSchemaSerializer
implements DorisRecordSerialize
LOG.debug("received debezium json data {} :", record);
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
String op = getOperateType(recordRoot);
- try {
- schemaChange.schemaChange(recordRoot);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ schemaChange.schemaChange(recordRoot);
+
return dataChange.serialize(record, recordRoot, op);
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
index 01eebd45..c3a4a7d8 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
@@ -89,26 +89,32 @@ public class MongoJsonDebeziumSchemaChange extends
CdcSchemaChange {
}
@Override
- public boolean schemaChange(JsonNode recordRoot) throws IOException {
- JsonNode logData = getFullDocument(recordRoot);
- String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
- String dorisTableIdentifier =
- getDorisTableIdentifier(cdcTableIdentifier, dorisOptions,
tableMapping);
- String[] tableInfo = dorisTableIdentifier.split("\\.");
- if (tableInfo.length != 2) {
- throw new DorisRuntimeException();
+ public boolean schemaChange(JsonNode recordRoot) {
+ try {
+ JsonNode logData = getFullDocument(recordRoot);
+ String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+ String dorisTableIdentifier =
+ getDorisTableIdentifier(cdcTableIdentifier, dorisOptions,
tableMapping);
+ String[] tableInfo = dorisTableIdentifier.split("\\.");
+ if (tableInfo.length != 2) {
+ throw new DorisRuntimeException();
+ }
+ String dataBase = tableInfo[0];
+ String table = tableInfo[1];
+ // build table fields mapping for all record
+ buildDorisTableFieldsMapping(dataBase, table);
+
+ // Determine whether change stream log and tableField are exactly
the same, if not,
+ // perform
+ // schema change
+ checkAndUpdateSchemaChange(logData, dorisTableIdentifier,
dataBase, table);
+ formatSpecialFieldData(logData);
+ ((ObjectNode) recordRoot).set(FIELD_DATA, logData);
+ return true;
+ } catch (Exception ex) {
+ LOG.warn("schema change error : ", ex);
+ return false;
}
- String dataBase = tableInfo[0];
- String table = tableInfo[1];
- // build table fields mapping for all record
- buildDorisTableFieldsMapping(dataBase, table);
-
- // Determine whether change stream log and tableField are exactly the
same, if not, perform
- // schema change
- checkAndUpdateSchemaChange(logData, dorisTableIdentifier, dataBase,
table);
- formatSpecialFieldData(logData);
- ((ObjectNode) recordRoot).set(FIELD_DATA, logData);
- return true;
}
private void formatSpecialFieldData(JsonNode logData) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]