This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7a763e0 [improve](cdc)improve compatibility with other databases by
introducing an option to synchronize default values when inserting null data.
(#361)
7a763e0 is described below
commit 7a763e0f97e05cad4509ea259fdc99a186d3017b
Author: Petrichor <[email protected]>
AuthorDate: Tue Apr 9 11:32:28 2024 +0800
[improve](cdc)improve compatibility with other databases by introducing an
option to synchronize default values when inserting null data. (#361)
---
.../doris/flink/tools/cdc/oracle/OracleDatabaseSync.java | 12 +++++++++---
.../flink/tools/cdc/postgres/PostgresDatabaseSync.java | 12 +++++++++---
.../flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java | 14 +++++++++++---
.../doris/flink/tools/cdc/sqlserver/SqlServerType.java | 2 ++
4 files changed, 31 insertions(+), 9 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index 945c839..89214fd 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -28,10 +28,12 @@ import
com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
+import
org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.slf4j.Logger;
@@ -175,9 +177,13 @@ public class OracleDatabaseSync extends DatabaseSync {
}
}
- Map<String, Object> customConverterConfigs = new HashMap<>();
- JsonDebeziumDeserializationSchema schema =
- new JsonDebeziumDeserializationSchema(false,
customConverterConfigs);
+ DebeziumDeserializationSchema<String> schema;
+ if (ignoreDefaultValue) {
+ schema = new DorisJsonDebeziumDeserializationSchema();
+ } else {
+ Map<String, Object> customConverterConfigs = new HashMap<>();
+ schema = new JsonDebeziumDeserializationSchema(false,
customConverterConfigs);
+ }
if
(config.getBoolean(OracleSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED,
false)) {
JdbcIncrementalSource<String> incrSource =
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
index 2645d83..490fdbc 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
@@ -28,10 +28,12 @@ import
com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
+import
org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.slf4j.Logger;
@@ -160,9 +162,13 @@ public class PostgresDatabaseSync extends DatabaseSync {
}
}
- Map<String, Object> customConverterConfigs = new HashMap<>();
- JsonDebeziumDeserializationSchema schema =
- new JsonDebeziumDeserializationSchema(false,
customConverterConfigs);
+ DebeziumDeserializationSchema<String> schema;
+ if (ignoreDefaultValue) {
+ schema = new DorisJsonDebeziumDeserializationSchema();
+ } else {
+ Map<String, Object> customConverterConfigs = new HashMap<>();
+ schema = new JsonDebeziumDeserializationSchema(false,
customConverterConfigs);
+ }
if (config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED,
false)) {
JdbcIncrementalSource<String> incrSource =
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
index f4d6ba3..9f286ff 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
@@ -26,12 +26,15 @@ import
com.ververica.cdc.connectors.base.options.JdbcSourceOptions;
import com.ververica.cdc.connectors.base.options.SourceOptions;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
+import
org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.slf4j.Logger;
@@ -155,9 +158,14 @@ public class SqlServerDatabaseSync extends DatabaseSync {
}
}
- Map<String, Object> customConverterConfigs = new HashMap<>();
- JsonDebeziumDeserializationSchema schema =
- new JsonDebeziumDeserializationSchema(false,
customConverterConfigs);
+ DebeziumDeserializationSchema<String> schema;
+ if (ignoreDefaultValue) {
+ schema = new DorisJsonDebeziumDeserializationSchema();
+ } else {
+ Map<String, Object> customConverterConfigs = new HashMap<>();
+
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
+ schema = new JsonDebeziumDeserializationSchema(false,
customConverterConfigs);
+ }
if (config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED,
false)) {
JdbcIncrementalSource<String> incrSource =
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
index 6c92ae4..ff37c06 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
@@ -41,6 +41,7 @@ public class SqlServerType {
private static final String NVARCHAR = "nvarchar";
private static final String TEXT = "text";
private static final String NTEXT = "ntext";
+ private static final String XML = "xml";
private static final String UNIQUEIDENTIFIER = "uniqueidentifier";
private static final String TIME = "time";
private static final String TIMESTAMP = "timestamp";
@@ -104,6 +105,7 @@ public class SqlServerType {
case UNIQUEIDENTIFIER:
case BINARY:
case VARBINARY:
+ case XML:
return DorisType.STRING;
default:
throw new UnsupportedOperationException(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]