This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new c4ab9a2 [Feature] add postgresql cdc (#178)
c4ab9a2 is described below
commit c4ab9a2a922011160b8ea5145020ddae43def22b
Author: wudi <[email protected]>
AuthorDate: Tue Aug 15 11:21:51 2023 +0800
[Feature] add postgresql cdc (#178)
---
flink-doris-connector/pom.xml | 6 +
.../sink/writer/JsonDebeziumSchemaSerializer.java | 16 +-
.../org/apache/doris/flink/tools/cdc/CdcTools.java | 13 ++
.../tools/cdc/postgres/PostgresDatabaseSync.java | 189 +++++++++++++++++++++
.../tools/cdc/postgres/PostgresDateConverter.java | 120 +++++++++++++
.../flink/tools/cdc/postgres/PostgresSchema.java | 33 ++++
.../flink/tools/cdc/postgres/PostgresType.java | 146 ++++++++++++++++
.../tools/cdc/CdcPostgresSyncDatabaseCase.java | 84 +++++++++
8 files changed, 599 insertions(+), 8 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index ec27c58..73a748c 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -254,6 +254,12 @@ under the License.
<version>2.4.1</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-sql-connector-postgres-cdc</artifactId>
+ <version>2.4.1</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index fa3b096..38ae229 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -137,7 +137,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
if (newSchemaChange && firstLoad) {
initOriginFieldSchema(recordRoot);
}
- Map<String, String> valueMap;
+ Map<String, Object> valueMap;
switch (op) {
case OP_READ:
case OP_CREATE:
@@ -166,14 +166,14 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
StringBuilder updateRow = new StringBuilder();
if(!ignoreUpdateBefore){
//convert delete
- Map<String, String> beforeRow = extractBeforeRow(recordRoot);
+ Map<String, Object> beforeRow = extractBeforeRow(recordRoot);
addDeleteSign(beforeRow, true);
updateRow.append(objectMapper.writeValueAsString(beforeRow))
.append(this.lineDelimiter);
}
//convert insert
- Map<String, String> afterRow = extractAfterRow(recordRoot);
+ Map<String, Object> afterRow = extractAfterRow(recordRoot);
addDeleteSign(afterRow, false);
updateRow.append(objectMapper.writeValueAsString(afterRow));
return updateRow.toString().getBytes(StandardCharsets.UTF_8);
@@ -274,7 +274,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return sourceTableName.equals(dbTbl);
}
- private void addDeleteSign(Map<String, String> valueMap, boolean delete) {
+ private void addDeleteSign(Map<String, Object> valueMap, boolean delete) {
if(delete){
valueMap.put(DORIS_DELETE_SIGN, "1");
}else{
@@ -374,16 +374,16 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
!(record.get(key) instanceof NullNode) ?
record.get(key).asText() : null;
}
- private Map<String, String> extractBeforeRow(JsonNode record) {
+ private Map<String, Object> extractBeforeRow(JsonNode record) {
return extractRow(record.get("before"));
}
- private Map<String, String> extractAfterRow(JsonNode record) {
+ private Map<String, Object> extractAfterRow(JsonNode record) {
return extractRow(record.get("after"));
}
- private Map<String, String> extractRow(JsonNode recordRow) {
- Map<String, String> recordMap = objectMapper.convertValue(recordRow,
new TypeReference<Map<String, String>>() {
+ private Map<String, Object> extractRow(JsonNode recordRow) {
+ Map<String, Object> recordMap = objectMapper.convertValue(recordRow,
new TypeReference<Map<String, Object>>() {
});
return recordMap != null ? recordMap : new HashMap<>();
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 6d10266..754dbce 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.tools.cdc;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
+import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -35,6 +36,7 @@ import java.util.Map;
public class CdcTools {
private static final String MYSQL_SYNC_DATABASE = "mysql-sync-database";
private static final String ORACLE_SYNC_DATABASE = "oracle-sync-database";
+ private static final String POSTGRES_SYNC_DATABASE =
"postgres-sync-database";
private static final List<String> EMPTY_KEYS = Arrays.asList("password");
public static void main(String[] args) throws Exception {
@@ -48,6 +50,9 @@ public class CdcTools {
case ORACLE_SYNC_DATABASE:
createOracleSyncDatabase(opArgs);
break;
+ case POSTGRES_SYNC_DATABASE:
+ createPostgresSyncDatabase(opArgs);
+ break;
default:
System.out.println("Unknown operation " + operation);
System.exit(1);
@@ -70,6 +75,14 @@ public class CdcTools {
syncDatabase(params, databaseSync, oracleConfig, "Oracle");
}
+ private static void createPostgresSyncDatabase(String[] opArgs) throws
Exception {
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ Map<String, String> postgresMap = getConfigMap(params,
"postgres-conf");
+ Configuration postgresConfig = Configuration.fromMap(postgresMap);
+ DatabaseSync databaseSync = new PostgresDatabaseSync();
+ syncDatabase(params, databaseSync, postgresConfig, "Postgres");
+ }
+
private static void syncDatabase(MultipleParameterTool params,
DatabaseSync databaseSync, Configuration config, String type) throws Exception {
String jobName = params.get("job-name");
String database = params.get("database");
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
new file mode 100644
index 0000000..31878a5
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.postgres;
+
+
+import com.ververica.cdc.connectors.base.options.SourceOptions;
+import com.ververica.cdc.connectors.base.options.StartupOptions;
+import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
+import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
+import
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
+import com.ververica.cdc.debezium.DebeziumSourceFunction;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static
com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+import static
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.DECODING_PLUGIN_NAME;
+import static
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
+import static
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME;
+
+public class PostgresDatabaseSync extends DatabaseSync {
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresDatabaseSync.class);
+
+ private static String JDBC_URL = "jdbc:postgresql://%s:%d/%s";
+
+ public PostgresDatabaseSync() {
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ String jdbcUrl = String.format(JDBC_URL,
config.get(PostgresSourceOptions.HOSTNAME),
config.get(PostgresSourceOptions.PG_PORT),config.get(PostgresSourceOptions.DATABASE_NAME));
+ Properties pro = new Properties();
+ pro.setProperty("user", config.get(PostgresSourceOptions.USERNAME));
+ pro.setProperty("password",
config.get(PostgresSourceOptions.PASSWORD));
+ return DriverManager.getConnection(jdbcUrl, pro);
+ }
+
+ @Override
+ public List<SourceSchema> getSchemaList() throws Exception {
+ String databaseName = config.get(PostgresSourceOptions.DATABASE_NAME);
+ String schemaName = config.get(PostgresSourceOptions.SCHEMA_NAME);
+ List<SourceSchema> schemaList = new ArrayList<>();
+ LOG.info("database-name {}, schema-name {}", databaseName, schemaName);
+ try (Connection conn = getConnection()) {
+ DatabaseMetaData metaData = conn.getMetaData();
+ try (ResultSet tables =
+ metaData.getTables(databaseName, schemaName, "%", new
String[]{"TABLE"})) {
+ while (tables.next()) {
+ String tableName = tables.getString("TABLE_NAME");
+ String tableComment = tables.getString("REMARKS");
+ if (!isSyncNeeded(tableName)) {
+ continue;
+ }
+ SourceSchema sourceSchema =
+ new PostgresSchema(metaData, databaseName,
schemaName, tableName, tableComment);
+ sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0
? DataModel.UNIQUE : DataModel.DUPLICATE);
+ schemaList.add(sourceSchema);
+ }
+ }
+ }
+ return schemaList;
+ }
+
+ @Override
+ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment
env) {
+ String databaseName = config.get(PostgresSourceOptions.DATABASE_NAME);
+ String schemaName = config.get(PostgresSourceOptions.SCHEMA_NAME);
+ String slotName = config.get(SLOT_NAME);
+ Preconditions.checkNotNull(databaseName, "database-name in postgres is
required");
+ Preconditions.checkNotNull(schemaName, "schema-name in postgres is
required");
+ Preconditions.checkNotNull(slotName, "slot.name in postgres is
required");
+
+ String tableName = config.get(PostgresSourceOptions.TABLE_NAME);
+ String hostname = config.get(PostgresSourceOptions.HOSTNAME);
+ Integer port = config.get(PostgresSourceOptions.PG_PORT);
+ String username = config.get(PostgresSourceOptions.USERNAME);
+ String password = config.get(PostgresSourceOptions.PASSWORD);
+
+ StartupOptions startupOptions = StartupOptions.initial();
+ String startupMode =
config.get(PostgresSourceOptions.SCAN_STARTUP_MODE);
+ if ("initial".equalsIgnoreCase(startupMode)) {
+ startupOptions = StartupOptions.initial();
+ } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
+ startupOptions = StartupOptions.latest();
+ }
+
+ //debezium properties set
+ Properties debeziumProperties = new Properties();
+ debeziumProperties.putAll(PostgresDateConverter.DEFAULT_PROPS);
+ debeziumProperties.put("decimal.handling.mode", "string");
+
+ for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
+ debeziumProperties.put(
+
key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
+ }
+ }
+
+ Map<String, Object> customConverterConfigs = new HashMap<>();
+ JsonDebeziumDeserializationSchema schema =
+ new JsonDebeziumDeserializationSchema(false,
customConverterConfigs);
+
+ if(config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED,
false)){
+ JdbcIncrementalSource<String> incrSource =
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
+ .hostname(hostname)
+ .port(port)
+ .database(databaseName)
+ .schemaList(schemaName)
+ .tableList(schemaName + "." + tableName)
+ .username(username)
+ .password(password)
+ .deserializer(schema)
+ .slotName(slotName)
+ .decodingPluginName(config.get(DECODING_PLUGIN_NAME))
+ .includeSchemaChanges(true)
+ .debeziumProperties(debeziumProperties)
+ .startupOptions(startupOptions)
+
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
+ .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
+ .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
+ .connectTimeout(config.get(CONNECT_TIMEOUT))
+ .connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
+ .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
+
.distributionFactorUpper(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
+
.distributionFactorLower(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
+ .heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
+ .build();
+ return env.fromSource(incrSource,
WatermarkStrategy.noWatermarks(), "Postgres IncrSource");
+ }else{
+ DebeziumSourceFunction<String> postgresSource =
PostgreSQLSource.<String>builder()
+ .hostname(hostname)
+ .port(port)
+ .database(databaseName)
+ .schemaList(schemaName)
+ .tableList(schemaName + "." + tableName)
+ .username(username)
+ .password(password)
+ .debeziumProperties(debeziumProperties)
+ .deserializer(schema)
+ .slotName(slotName)
+ .decodingPluginName(config.get(DECODING_PLUGIN_NAME))
+ .build();
+ return env.addSource(postgresSource, "Postgres Source");
+ }
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java
new file mode 100644
index 0000000..b91269d
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.postgres;
+
+import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.DateTimeException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+public class PostgresDateConverter implements CustomConverter<SchemaBuilder,
RelationalColumn> {
+ private static final Logger log =
LoggerFactory.getLogger(PostgresDateConverter.class);
+ private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
+ private DateTimeFormatter timestampFormatter =
DateTimeFormatter.ISO_DATE_TIME;
+
+ public static Properties DEFAULT_PROPS = new Properties();
+
+ static {
+ DEFAULT_PROPS.setProperty("converters", "date");
+ DEFAULT_PROPS.setProperty("date.type",
"org.apache.doris.flink.tools.cdc.postgres.PostgresDateConverter");
+ DEFAULT_PROPS.setProperty("date.format.date", "yyyy-MM-dd");
+ DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd
HH:mm:ss.SSSSSS");
+ }
+
+ @Override
+ public void configure(Properties props) {
+ readProps(props, "format.date", p -> dateFormatter =
DateTimeFormatter.ofPattern(p));
+ readProps(props, "format.timestamp", p -> timestampFormatter =
DateTimeFormatter.ofPattern(p));
+ }
+
+ private void readProps(Properties properties, String settingKey,
Consumer<String> callback) {
+ String settingValue = (String) properties.get(settingKey);
+ if (settingValue == null || settingValue.length() == 0) {
+ return;
+ }
+ try {
+ callback.accept(settingValue.trim());
+ } catch (IllegalArgumentException | DateTimeException e) {
+ log.error("setting {} is illegal:{}", settingKey, settingValue);
+ throw e;
+ }
+ }
+
+ @Override
+ public void converterFor(RelationalColumn column,
ConverterRegistration<SchemaBuilder> registration) {
+ String sqlType = column.typeName().toUpperCase();
+ SchemaBuilder schemaBuilder = null;
+ Converter converter = null;
+ if ("DATE".equals(sqlType)) {
+ schemaBuilder = SchemaBuilder.string().optional();
+ converter = this::convertDate;
+ }
+ if ("TIME".equals(sqlType)) {
+ schemaBuilder = SchemaBuilder.string().optional();
+ converter = this::convertTime;
+ }
+ if ("TIMESTAMP".equals(sqlType)) {
+ schemaBuilder = SchemaBuilder.string().optional();
+ converter = this::convertTimestamp;
+ }
+ if (schemaBuilder != null) {
+ registration.register(schemaBuilder, converter);
+ }
+ }
+
+ private String convertDate(Object input) {
+ if (input instanceof LocalDate) {
+ return dateFormatter.format((LocalDate) input);
+ } else if (input instanceof Integer) {
+ LocalDate date = LocalDate.ofEpochDay((Integer) input);
+ return dateFormatter.format(date);
+ } else if (input instanceof Date){
+ return dateFormatter.format(((Date) input).toLocalDate());
+ }
+ return null;
+ }
+
+ private String convertTime(Object input) {
+ if (input instanceof String) {
+ return input.toString();
+ }
+ return null;
+ }
+
+ private String convertTimestamp(Object input) {
+ if (input instanceof Timestamp) {
+ return timestampFormatter.format(((Timestamp)
input).toLocalDateTime());
+ } else if (input instanceof Instant){
+ LocalDateTime ldt = LocalDateTime.ofInstant(((Instant) input),
ZoneOffset.UTC);
+ return timestampFormatter.format(ldt);
+ }
+ return null;
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
new file mode 100644
index 0000000..de9d869
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.postgres;
+
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+
+import java.sql.DatabaseMetaData;
+
+public class PostgresSchema extends SourceSchema {
+
+ public PostgresSchema(DatabaseMetaData metaData, String databaseName,
String schemaName, String tableName, String tableComment) throws Exception {
+ super(metaData, databaseName, schemaName, tableName, tableComment);
+ }
+
+ @Override
+ public String convertToDorisType(String fieldType, Integer precision,
Integer scale) {
+ return PostgresType.toDorisType(fieldType, precision, scale);
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java
new file mode 100644
index 0000000..87cbde2
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.postgres;
+
+import org.apache.doris.flink.catalog.doris.DorisType;
+import org.apache.flink.util.Preconditions;
+
+public class PostgresType {
+ private static final String INT2 = "int2";
+ private static final String SMALLSERIAL = "smallserial";
+ private static final String INT4 = "int4";
+ private static final String SERIAL = "serial";
+ private static final String INT8 = "int8";
+ private static final String BIGSERIAL = "bigserial";
+ private static final String NUMERIC = "numeric";
+ private static final String FLOAT4 = "float4";
+ private static final String FLOAT8 = "float8";
+ private static final String BPCHAR = "bpchar";
+ private static final String TIMESTAMP = "timestamp";
+ private static final String TIMESTAMPTZ = "timestamptz";
+ private static final String DATE = "date";
+ private static final String BOOL = "bool";
+ private static final String BIT = "bit";
+ private static final String POINT = "point";
+ private static final String LINE = "line";
+ private static final String LSEG = "lseg";
+ private static final String BOX = "box";
+ private static final String PATH = "path";
+ private static final String POLYGON = "polygon";
+ private static final String CIRCLE = "circle";
+ private static final String VARCHAR = "varchar";
+ private static final String TEXT = "text";
+ private static final String TIME = "time";
+ private static final String TIMETZ = "timetz";
+ private static final String INTERVAL = "interval";
+ private static final String CIDR = "cidr";
+ private static final String INET = "inet";
+ private static final String MACADDR = "macaddr";
+ private static final String VARBIT = "varbit";
+ private static final String UUID = "uuid";
+ private static final String BYTEA = "bytea";
+ private static final String JSON = "json";
+ private static final String JSONB = "jsonb";
+ private static final String _INT2 = "_int2";
+ private static final String _INT4 = "_int4";
+ private static final String _INT8 = "_int8";
+ private static final String _FLOAT4 = "_float4";
+ private static final String _FLOAT8 = "_float8";
+ private static final String _DATE = "_date";
+ private static final String _TIMESTAMP = "_timestamp";
+ private static final String _BOOL = "_bool";
+ private static final String _TEXT = "_text";
+
+ public static String toDorisType(String postgresType, Integer precision,
Integer scale) {
+ postgresType = postgresType.toLowerCase();
+ switch (postgresType){
+ case INT2:
+ case SMALLSERIAL:
+ return DorisType.TINYINT;
+ case INT4:
+ case SERIAL:
+ return DorisType.INT;
+ case INT8:
+ case BIGSERIAL:
+ return DorisType.BIGINT;
+ case NUMERIC:
+ return precision != null && precision <= 38
+ ? String.format("%s(%s,%s)", DorisType.DECIMAL_V3,
precision, scale != null && scale >= 0 ? scale : 0)
+ : DorisType.STRING;
+ case FLOAT4:
+ return DorisType.FLOAT;
+ case FLOAT8:
+ return DorisType.DOUBLE;
+ case BPCHAR:
+ Preconditions.checkNotNull(precision);
+ return String.format("%s(%s)", DorisType.CHAR, precision);
+ case TIMESTAMP:
+ case TIMESTAMPTZ:
+ return String.format("%s(%s)", DorisType.DATETIME_V2,
Math.min(precision == null ? 0 : precision, 6));
+ case DATE:
+ return DorisType.DATE_V2;
+ case BOOL:
+ return DorisType.BOOLEAN;
+ case BIT:
+ return precision == 1 ? DorisType.BOOLEAN : DorisType.STRING;
+ case VARCHAR:
+ Preconditions.checkNotNull(precision);
+ return precision * 3 > 65533 ? DorisType.STRING :
String.format("%s(%s)", DorisType.VARCHAR, precision * 3);
+ case POINT:
+ case LINE:
+ case LSEG:
+ case BOX:
+ case PATH:
+ case POLYGON:
+ case CIRCLE:
+ case TEXT:
+ case TIME:
+ case TIMETZ:
+ case INTERVAL:
+ case CIDR:
+ case INET:
+ case MACADDR:
+ case VARBIT:
+ case UUID:
+ case BYTEA:
+ return DorisType.STRING;
+ case JSON:
+ case JSONB:
+ return DorisType.JSONB;
+ case _BOOL:
+ return String.format("%s<%s>", DorisType.ARRAY,
DorisType.BOOLEAN);
+ case _INT2:
+ return String.format("%s<%s>", DorisType.ARRAY,
DorisType.TINYINT);
+ case _INT4:
+ return String.format("%s<%s>", DorisType.ARRAY, DorisType.INT);
+ case _INT8:
+ return String.format("%s<%s>", DorisType.ARRAY,
DorisType.BIGINT);
+ case _FLOAT4:
+ return String.format("%s<%s>", DorisType.ARRAY,
DorisType.FLOAT);
+ case _FLOAT8:
+ return String.format("%s<%s>", DorisType.ARRAY,
DorisType.DOUBLE);
+ case _TEXT:
+ return String.format("%s<%s>", DorisType.ARRAY,
DorisType.STRING);
+ case _DATE:
+ return String.format("%s<%s>", DorisType.ARRAY,
DorisType.DATE_V2);
+ case _TIMESTAMP:
+ return String.format("%s<%s>", DorisType.ARRAY,
DorisType.DATETIME_V2);
+ default:
+ throw new UnsupportedOperationException("Unsupported Postgres
Type: " + postgresType);
+ }
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
new file mode 100644
index 0000000..4d5b485
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcPostgresSyncDatabaseCase {
+
+ public static void main(String[] args) throws Exception{
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.disableOperatorChaining();
+ env.enableCheckpointing(10000);
+
+// Map<String,String> flinkMap = new HashMap<>();
+// flinkMap.put("execution.checkpointing.interval","10s");
+// flinkMap.put("pipeline.operator-chaining","false");
+// flinkMap.put("parallelism.default","1");
+//
+//
+// Configuration configuration = Configuration.fromMap(flinkMap);
+// env.configure(configuration);
+
+ String database = "db2";
+ String tablePrefix = "";
+ String tableSuffix = "";
+ Map<String,String> sourceConfig = new HashMap<>();
+ sourceConfig.put("database-name","postgres");
+ sourceConfig.put("schema-name","public");
+ sourceConfig.put("slot.name","test");
+ sourceConfig.put("decoding.plugin.name","pgoutput");
+ sourceConfig.put("hostname","127.0.0.1");
+ sourceConfig.put("port","5432");
+ sourceConfig.put("username","postgres");
+ sourceConfig.put("password","123456");
+//
sourceConfig.put("debezium.database.tablename.case.insensitive","false");
+// sourceConfig.put("scan.incremental.snapshot.enabled","true");
+// sourceConfig.put("debezium.include.schema.changes","false");
+
+ Configuration config = Configuration.fromMap(sourceConfig);
+
+ Map<String,String> sinkConfig = new HashMap<>();
+ sinkConfig.put("fenodes","127.0.0.1:8737");
+ sinkConfig.put("username","root");
+ sinkConfig.put("password","");
+ sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9737");
+ sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+ Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+ Map<String,String> tableConfig = new HashMap<>();
+ tableConfig.put("replication_num", "1");
+
+ String includingTables = "testcdc";
+ String excludingTables = "";
+ boolean ignoreDefaultValue = false;
+ boolean useNewSchemaChange = false;
+ DatabaseSync databaseSync = new PostgresDatabaseSync();
+
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
false, useNewSchemaChange);
+ databaseSync.build();
+ env.execute(String.format("Postgres-Doris Database Sync: %s",
database));
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]