This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 698e9cd243 [fix](demo)fix cdc failed to synchronize datetime type in
mysql, and added JsonDebeziumSchemaSerializer (#16971)
698e9cd243 is described below
commit 698e9cd243092b4b47a818fa0740998cfc6fef7f
Author: DongLiang-0 <[email protected]>
AuthorDate: Thu Mar 2 14:14:58 2023 +0800
[fix](demo)fix cdc failed to synchronize datetime type in mysql, and added
JsonDebeziumSchemaSerializer (#16971)
* [fix](demo)fix cdc failed to synchronize datetime type in mysql, and
added JsonDebeziumSchemaSerializer
* add licenses for DateToStringConverter
---
samples/doris-demo/flink-demo-v1.1/pom.xml | 23 ++--
.../flink/converter/DateToStringConverter.java | 146 +++++++++++++++++++++
.../demo/flink/dbsync/DatabaseFullDelSync.java | 1 +
.../doris/demo/flink/dbsync/DatabaseFullSync.java | 61 ++-------
4 files changed, 170 insertions(+), 61 deletions(-)
diff --git a/samples/doris-demo/flink-demo-v1.1/pom.xml
b/samples/doris-demo/flink-demo-v1.1/pom.xml
index c1e00ea318..87054383ec 100644
--- a/samples/doris-demo/flink-demo-v1.1/pom.xml
+++ b/samples/doris-demo/flink-demo-v1.1/pom.xml
@@ -29,7 +29,7 @@ under the License.
<properties>
<scala.version>2.12</scala.version>
<java.version>1.8</java.version>
- <flink.version>1.14.3</flink.version>
+ <flink.version>1.15.3</flink.version>
<fastjson.version>1.2.62</fastjson.version>
<hadoop.version>2.8.3</hadoop.version>
<scope.mode>compile</scope.mode>
@@ -53,17 +53,17 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.version}</artifactId>
+ <artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
+ <artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_${scala.version}</artifactId>
+ <artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
@@ -85,18 +85,13 @@ under the License.
<!-- flink-doris-connector -->
<dependency>
<groupId>org.apache.doris</groupId>
- <artifactId>flink-doris-connector-1.14_2.12</artifactId>
- <version>1.1.0</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.12</version>
+ <artifactId>flink-doris-connector-1.15</artifactId>
+ <version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
- <artifactId>flink-sql-connector-mysql-cdc</artifactId>
- <version>2.2.1</version>
+ <artifactId>flink-connector-mysql-cdc</artifactId>
+ <version>2.3.0</version>
<exclusions>
<exclusion>
<artifactId>flink-shaded-guava</artifactId>
@@ -106,7 +101,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime-web_${scala.version}</artifactId>
+ <artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
diff --git
a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/converter/DateToStringConverter.java
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/converter/DateToStringConverter.java
new file mode 100644
index 0000000000..fec014e539
--- /dev/null
+++
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/converter/DateToStringConverter.java
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.converter;
+
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.time.DateTimeException;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+public class DateToStringConverter implements CustomConverter<SchemaBuilder,
RelationalColumn> {
+ private static final Logger log =
LoggerFactory.getLogger(DateToStringConverter.class);
+ private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
+ private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
+ private DateTimeFormatter datetimeFormatter =
DateTimeFormatter.ISO_DATE_TIME;
+ private DateTimeFormatter timestampFormatter =
DateTimeFormatter.ISO_DATE_TIME;
+ private ZoneId timestampZoneId = ZoneId.systemDefault();
+
+ public static Properties DEFAULT_PROPS = new Properties();
+
+ static {
+ DEFAULT_PROPS.setProperty("converters", "date");
+ DEFAULT_PROPS.setProperty("date.type",
"org.apache.doris.demo.flink.converter.DateToStringConverter");
+ DEFAULT_PROPS.setProperty("date.format.date", "yyyy-MM-dd");
+ DEFAULT_PROPS.setProperty("date.format.datetime", "yyyy-MM-dd HH:mm:ss");
+ DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd HH:mm:ss");
+ DEFAULT_PROPS.setProperty("date.format.timestamp.zone", "UTC");
+ }
+
+ @Override
+ public void configure(Properties props) {
+ readProps(props, "format.date", p -> dateFormatter =
DateTimeFormatter.ofPattern(p));
+ readProps(props, "format.time", p -> timeFormatter =
DateTimeFormatter.ofPattern(p));
+ readProps(props, "format.datetime", p -> datetimeFormatter =
DateTimeFormatter.ofPattern(p));
+ readProps(props, "format.timestamp", p -> timestampFormatter =
DateTimeFormatter.ofPattern(p));
+ readProps(props, "format.timestamp.zone", z -> timestampZoneId =
ZoneId.of(z));
+ }
+
+ private void readProps(Properties properties, String settingKey,
Consumer<String> callback) {
+ String settingValue = (String) properties.get(settingKey);
+ if (settingValue == null || settingValue.length() == 0) {
+ return;
+ }
+ try {
+ callback.accept(settingValue.trim());
+ } catch (IllegalArgumentException | DateTimeException e) {
+ log.error("setting {} is illegal:{}", settingKey, settingValue);
+ throw e;
+ }
+ }
+
+ @Override
+ public void converterFor(RelationalColumn column,
ConverterRegistration<SchemaBuilder> registration) {
+ String sqlType = column.typeName().toUpperCase();
+ SchemaBuilder schemaBuilder = null;
+ Converter converter = null;
+ if ("DATE".equals(sqlType)) {
+ schemaBuilder = SchemaBuilder.string().optional();
+ converter = this::convertDate;
+ }
+ if ("TIME".equals(sqlType)) {
+ schemaBuilder = SchemaBuilder.string().optional();
+ converter = this::convertTime;
+ }
+ if ("DATETIME".equals(sqlType)) {
+ schemaBuilder = SchemaBuilder.string().optional();
+ converter = this::convertDateTime;
+ }
+ if ("TIMESTAMP".equals(sqlType)) {
+ schemaBuilder = SchemaBuilder.string().optional();
+ converter = this::convertTimestamp;
+ }
+ if (schemaBuilder != null) {
+ registration.register(schemaBuilder, converter);
+ }
+ }
+
+ private String convertDate(Object input) {
+ if (input instanceof LocalDate) {
+ return dateFormatter.format((LocalDate) input);
+ } else if (input instanceof Integer) {
+ LocalDate date = LocalDate.ofEpochDay((Integer) input);
+ return dateFormatter.format(date);
+ }
+ return null;
+ }
+
+ private String convertTime(Object input) {
+ if (input instanceof Duration) {
+ Duration duration = (Duration) input;
+ long seconds = duration.getSeconds();
+ int nano = duration.getNano();
+ LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
+ return timeFormatter.format(time);
+ }
+ return null;
+ }
+
+ private String convertDateTime(Object input) {
+ if (input instanceof LocalDateTime) {
+ return datetimeFormatter.format((LocalDateTime) input);
+ } else if (input instanceof Timestamp) {
+ return datetimeFormatter.format(((Timestamp) input).toLocalDateTime());
+ }
+ return null;
+ }
+
+ private String convertTimestamp(Object input) {
+ if (input instanceof ZonedDateTime) {
+ // mysql timestamp will be converted to UTC storage,and the
zonedDatetime here is UTC time
+ ZonedDateTime zonedDateTime = (ZonedDateTime) input;
+ LocalDateTime localDateTime =
zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
+ return timestampFormatter.format(localDateTime);
+ } else if (input instanceof Timestamp) {
+ return timestampFormatter.format(((Timestamp) input).toLocalDateTime());
+ }
+ return null;
+ }
+
+}
diff --git
a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java
index 22da0b0f84..49fa87b724 100644
---
a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java
+++
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java
@@ -48,6 +48,7 @@ import java.util.HashMap;
* Synchronize the full database through flink cdc
*
*/
+@Deprecated
public class DatabaseFullDelSync {
private static String SOURCE_DB = "custom_db";//db
diff --git
a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java
index 417543df49..55790222b2 100644
---
a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java
+++
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java
@@ -20,24 +20,25 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.doris.demo.flink.converter.DateToStringConverter;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.json.JsonConverterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
@@ -60,6 +61,9 @@ public class DatabaseFullSync {
private static String TARGET_DORIS_DB = "test";
public static void main(String[] args) throws Exception {
+ Map<String, Object> customConverterConfigs = new HashMap<>();
+ customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
+
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(HOST)
.port(MYSQL_PORT)
@@ -67,7 +71,9 @@ public class DatabaseFullSync {
.tableList(SYNC_TBLS) // set captured table
.username(MYSQL_USER)
.password(MYSQL_PASSWD)
- .deserializer(new JsonDebeziumDeserializationSchema()) // converts
SourceRecord to JSON String
+ .debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
+ .deserializer(new JsonDebeziumDeserializationSchema(false,
customConverterConfigs))
+ .includeSchemaChanges(true)
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -82,51 +88,12 @@ public class DatabaseFullSync {
LOG.info("sync table list:{}",tableList);
for(String tbl : tableList){
SingleOutputStreamOperator<String> filterStream =
filterTableData(cdcSource, tbl);
- SingleOutputStreamOperator<String> cleanStream =
clean(filterStream);
DorisSink dorisSink = buildDorisSink(tbl);
- cleanStream.sinkTo(dorisSink).name("sink " + tbl);
+ filterStream.sinkTo(dorisSink).name("sink " + tbl);
}
env.execute("Full Database Sync ");
}
- /**
- * Get real data
- * {
- * "before":null,
- * "after":{
- * "id":1,
- * "name":"zhangsan-1",
- * "age":18
- * },
- * "source":{
- * "db":"test",
- * "table":"test_1",
- * ...
- * },
- * "op":"c",
- * ...
- * }
- * */
- private static SingleOutputStreamOperator<String>
clean(SingleOutputStreamOperator<String> source) {
- return source.flatMap(new FlatMapFunction<String,String>(){
- @Override
- public void flatMap(String row, Collector<String> out) throws
Exception {
- try{
- JSONObject rowJson = JSON.parseObject(row);
- String op = rowJson.getString("op");
- //history,insert,update
- if(Arrays.asList("r","c","u").contains(op)){
-
out.collect(rowJson.getJSONObject("after").toJSONString());
- }else{
- LOG.info("filter other op:{}",op);
- }
- }catch (Exception ex){
- LOG.warn("filter other format binlog:{}",row);
- }
- }
- });
- }
-
/**
* Divide according to tablename
* */
@@ -182,11 +149,11 @@ public class DatabaseFullSync {
pro.setProperty("read_json_by_line", "true");
DorisExecutionOptions executionOptions =
DorisExecutionOptions.builder()
.setLabelPrefix("label-" + table + UUID.randomUUID()) //streamload
label prefix,
- .setStreamLoadProp(pro).build();
+ .setStreamLoadProp(pro).setDeletable(true).build();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
- .setSerializer(new SimpleStringSerializer()) //serialize according
to string
+
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisBuilder.build()).build())
.setDorisOptions(dorisBuilder.build());
return builder.build();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]