This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new c4ab9a2  [Feature] add postgresql cdc (#178)
c4ab9a2 is described below

commit c4ab9a2a922011160b8ea5145020ddae43def22b
Author: wudi <[email protected]>
AuthorDate: Tue Aug 15 11:21:51 2023 +0800

    [Feature] add postgresql cdc (#178)
---
 flink-doris-connector/pom.xml                      |   6 +
 .../sink/writer/JsonDebeziumSchemaSerializer.java  |  16 +-
 .../org/apache/doris/flink/tools/cdc/CdcTools.java |  13 ++
 .../tools/cdc/postgres/PostgresDatabaseSync.java   | 189 +++++++++++++++++++++
 .../tools/cdc/postgres/PostgresDateConverter.java  | 120 +++++++++++++
 .../flink/tools/cdc/postgres/PostgresSchema.java   |  33 ++++
 .../flink/tools/cdc/postgres/PostgresType.java     | 146 ++++++++++++++++
 .../tools/cdc/CdcPostgresSyncDatabaseCase.java     |  84 +++++++++
 8 files changed, 599 insertions(+), 8 deletions(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index ec27c58..73a748c 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -254,6 +254,12 @@ under the License.
             <version>2.4.1</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
+            <version>2.4.1</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-runtime-web</artifactId>
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index fa3b096..38ae229 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -137,7 +137,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         if (newSchemaChange && firstLoad) {
             initOriginFieldSchema(recordRoot);
         }
-        Map<String, String> valueMap;
+        Map<String, Object> valueMap;
         switch (op) {
             case OP_READ:
             case OP_CREATE:
@@ -166,14 +166,14 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         StringBuilder updateRow = new StringBuilder();
         if(!ignoreUpdateBefore){
             //convert delete
-            Map<String, String> beforeRow = extractBeforeRow(recordRoot);
+            Map<String, Object> beforeRow = extractBeforeRow(recordRoot);
             addDeleteSign(beforeRow, true);
             updateRow.append(objectMapper.writeValueAsString(beforeRow))
                     .append(this.lineDelimiter);
         }
 
         //convert insert
-        Map<String, String> afterRow = extractAfterRow(recordRoot);
+        Map<String, Object> afterRow = extractAfterRow(recordRoot);
         addDeleteSign(afterRow, false);
         updateRow.append(objectMapper.writeValueAsString(afterRow));
         return updateRow.toString().getBytes(StandardCharsets.UTF_8);
@@ -274,7 +274,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return sourceTableName.equals(dbTbl);
     }
 
-    private void addDeleteSign(Map<String, String> valueMap, boolean delete) {
+    private void addDeleteSign(Map<String, Object> valueMap, boolean delete) {
         if(delete){
             valueMap.put(DORIS_DELETE_SIGN, "1");
         }else{
@@ -374,16 +374,16 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                 !(record.get(key) instanceof NullNode) ? 
record.get(key).asText() : null;
     }
 
-    private Map<String, String> extractBeforeRow(JsonNode record) {
+    private Map<String, Object> extractBeforeRow(JsonNode record) {
         return extractRow(record.get("before"));
     }
 
-    private Map<String, String> extractAfterRow(JsonNode record) {
+    private Map<String, Object> extractAfterRow(JsonNode record) {
         return extractRow(record.get("after"));
     }
 
-    private Map<String, String> extractRow(JsonNode recordRow) {
-        Map<String, String> recordMap = objectMapper.convertValue(recordRow, 
new TypeReference<Map<String, String>>() {
+    private Map<String, Object> extractRow(JsonNode recordRow) {
+        Map<String, Object> recordMap = objectMapper.convertValue(recordRow, 
new TypeReference<Map<String, Object>>() {
         });
         return recordMap != null ? recordMap : new HashMap<>();
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 6d10266..754dbce 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.tools.cdc;
 
 import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
 import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
+import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -35,6 +36,7 @@ import java.util.Map;
 public class CdcTools {
     private static final String MYSQL_SYNC_DATABASE = "mysql-sync-database";
     private static final String ORACLE_SYNC_DATABASE = "oracle-sync-database";
+    private static final String POSTGRES_SYNC_DATABASE = 
"postgres-sync-database";
     private static final List<String> EMPTY_KEYS = Arrays.asList("password");
 
     public static void main(String[] args) throws Exception {
@@ -48,6 +50,9 @@ public class CdcTools {
             case ORACLE_SYNC_DATABASE:
                 createOracleSyncDatabase(opArgs);
                 break;
+            case POSTGRES_SYNC_DATABASE:
+                createPostgresSyncDatabase(opArgs);
+                break;
             default:
                 System.out.println("Unknown operation " + operation);
                 System.exit(1);
@@ -70,6 +75,14 @@ public class CdcTools {
         syncDatabase(params, databaseSync, oracleConfig, "Oracle");
     }
 
+    private static void createPostgresSyncDatabase(String[] opArgs) throws 
Exception {
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+        Map<String, String> postgresMap = getConfigMap(params, 
"postgres-conf");
+        Configuration postgresConfig = Configuration.fromMap(postgresMap);
+        DatabaseSync databaseSync = new PostgresDatabaseSync();
+        syncDatabase(params, databaseSync, postgresConfig, "Postgres");
+    }
+
     private static void syncDatabase(MultipleParameterTool params, 
DatabaseSync databaseSync, Configuration config, String type) throws Exception {
         String jobName = params.get("job-name");
         String database = params.get("database");
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
new file mode 100644
index 0000000..31878a5
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.postgres;
+
+
+import com.ververica.cdc.connectors.base.options.SourceOptions;
+import com.ververica.cdc.connectors.base.options.StartupOptions;
+import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
+import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
+import 
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
+import com.ververica.cdc.debezium.DebeziumSourceFunction;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+import static 
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.DECODING_PLUGIN_NAME;
+import static 
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
+import static 
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME;
+
+public class PostgresDatabaseSync extends DatabaseSync {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresDatabaseSync.class);
+
+    private static String JDBC_URL = "jdbc:postgresql://%s:%d/%s";
+
+    public PostgresDatabaseSync() {
+    }
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        String jdbcUrl = String.format(JDBC_URL, 
config.get(PostgresSourceOptions.HOSTNAME), 
config.get(PostgresSourceOptions.PG_PORT),config.get(PostgresSourceOptions.DATABASE_NAME));
+        Properties pro = new Properties();
+        pro.setProperty("user", config.get(PostgresSourceOptions.USERNAME));
+        pro.setProperty("password", 
config.get(PostgresSourceOptions.PASSWORD));
+        return DriverManager.getConnection(jdbcUrl, pro);
+    }
+
+    @Override
+    public List<SourceSchema> getSchemaList() throws Exception {
+        String databaseName = config.get(PostgresSourceOptions.DATABASE_NAME);
+        String schemaName = config.get(PostgresSourceOptions.SCHEMA_NAME);
+        List<SourceSchema> schemaList = new ArrayList<>();
+        LOG.info("database-name {}, schema-name {}", databaseName, schemaName);
+        try (Connection conn = getConnection()) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            try (ResultSet tables =
+                         metaData.getTables(databaseName, schemaName, "%", new 
String[]{"TABLE"})) {
+                while (tables.next()) {
+                    String tableName = tables.getString("TABLE_NAME");
+                    String tableComment = tables.getString("REMARKS");
+                    if (!isSyncNeeded(tableName)) {
+                        continue;
+                    }
+                    SourceSchema sourceSchema =
+                            new PostgresSchema(metaData, databaseName, 
schemaName, tableName, tableComment);
+                    sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 
? DataModel.UNIQUE : DataModel.DUPLICATE);
+                    schemaList.add(sourceSchema);
+                }
+            }
+        }
+        return schemaList;
+    }
+
+    @Override
+    public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment 
env) {
+        String databaseName = config.get(PostgresSourceOptions.DATABASE_NAME);
+        String schemaName = config.get(PostgresSourceOptions.SCHEMA_NAME);
+        String slotName = config.get(SLOT_NAME);
+        Preconditions.checkNotNull(databaseName, "database-name in postgres is 
required");
+        Preconditions.checkNotNull(schemaName, "schema-name in postgres is 
required");
+        Preconditions.checkNotNull(slotName, "slot.name in postgres is 
required");
+
+        String tableName = config.get(PostgresSourceOptions.TABLE_NAME);
+        String hostname = config.get(PostgresSourceOptions.HOSTNAME);
+        Integer port = config.get(PostgresSourceOptions.PG_PORT);
+        String username = config.get(PostgresSourceOptions.USERNAME);
+        String password = config.get(PostgresSourceOptions.PASSWORD);
+
+        StartupOptions startupOptions = StartupOptions.initial();
+        String startupMode = 
config.get(PostgresSourceOptions.SCAN_STARTUP_MODE);
+        if ("initial".equalsIgnoreCase(startupMode)) {
+            startupOptions = StartupOptions.initial();
+        } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
+            startupOptions = StartupOptions.latest();
+        }
+
+        //debezium properties set
+        Properties debeziumProperties = new Properties();
+        debeziumProperties.putAll(PostgresDateConverter.DEFAULT_PROPS);
+        debeziumProperties.put("decimal.handling.mode", "string");
+
+        for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
+                debeziumProperties.put(
+                        
key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
+            }
+        }
+
+        Map<String, Object> customConverterConfigs = new HashMap<>();
+        JsonDebeziumDeserializationSchema schema =
+                new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs);
+
+        if(config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, 
false)){
+            JdbcIncrementalSource<String> incrSource = 
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
+                    .hostname(hostname)
+                    .port(port)
+                    .database(databaseName)
+                    .schemaList(schemaName)
+                    .tableList(schemaName + "." + tableName)
+                    .username(username)
+                    .password(password)
+                    .deserializer(schema)
+                    .slotName(slotName)
+                    .decodingPluginName(config.get(DECODING_PLUGIN_NAME))
+                    .includeSchemaChanges(true)
+                    .debeziumProperties(debeziumProperties)
+                    .startupOptions(startupOptions)
+                    
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
+                    .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
+                    .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
+                    .connectTimeout(config.get(CONNECT_TIMEOUT))
+                    .connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
+                    .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
+                    
.distributionFactorUpper(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
+                    
.distributionFactorLower(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
+                    .heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
+                    .build();
+            return env.fromSource(incrSource,  
WatermarkStrategy.noWatermarks(), "Postgres IncrSource");
+        }else{
+            DebeziumSourceFunction<String> postgresSource = 
PostgreSQLSource.<String>builder()
+                    .hostname(hostname)
+                    .port(port)
+                    .database(databaseName)
+                    .schemaList(schemaName)
+                    .tableList(schemaName + "." + tableName)
+                    .username(username)
+                    .password(password)
+                    .debeziumProperties(debeziumProperties)
+                    .deserializer(schema)
+                    .slotName(slotName)
+                    .decodingPluginName(config.get(DECODING_PLUGIN_NAME))
+                    .build();
+            return env.addSource(postgresSource, "Postgres Source");
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java
new file mode 100644
index 0000000..b91269d
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.postgres;
+
+import 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.DateTimeException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+public class PostgresDateConverter implements CustomConverter<SchemaBuilder, 
RelationalColumn> {
+    private static final Logger log = 
LoggerFactory.getLogger(PostgresDateConverter.class);
+    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
+    private DateTimeFormatter timestampFormatter = 
DateTimeFormatter.ISO_DATE_TIME;
+
+    public static Properties DEFAULT_PROPS = new Properties();
+
+    static {
+        DEFAULT_PROPS.setProperty("converters", "date");
+        DEFAULT_PROPS.setProperty("date.type", 
"org.apache.doris.flink.tools.cdc.postgres.PostgresDateConverter");
+        DEFAULT_PROPS.setProperty("date.format.date", "yyyy-MM-dd");
+        DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd 
HH:mm:ss.SSSSSS");
+    }
+
+    @Override
+    public void configure(Properties props) {
+        readProps(props, "format.date", p -> dateFormatter = 
DateTimeFormatter.ofPattern(p));
+        readProps(props, "format.timestamp", p -> timestampFormatter = 
DateTimeFormatter.ofPattern(p));
+    }
+
+    private void readProps(Properties properties, String settingKey, 
Consumer<String> callback) {
+        String settingValue = (String) properties.get(settingKey);
+        if (settingValue == null || settingValue.length() == 0) {
+            return;
+        }
+        try {
+            callback.accept(settingValue.trim());
+        } catch (IllegalArgumentException | DateTimeException e) {
+            log.error("setting {} is illegal:{}", settingKey, settingValue);
+            throw e;
+        }
+    }
+
+    @Override
+    public void converterFor(RelationalColumn column, 
ConverterRegistration<SchemaBuilder> registration) {
+        String sqlType = column.typeName().toUpperCase();
+        SchemaBuilder schemaBuilder = null;
+        Converter converter = null;
+        if ("DATE".equals(sqlType)) {
+            schemaBuilder = SchemaBuilder.string().optional();
+            converter = this::convertDate;
+        }
+        if ("TIME".equals(sqlType)) {
+            schemaBuilder = SchemaBuilder.string().optional();
+            converter = this::convertTime;
+        }
+        if ("TIMESTAMP".equals(sqlType)) {
+            schemaBuilder = SchemaBuilder.string().optional();
+            converter = this::convertTimestamp;
+        }
+        if (schemaBuilder != null) {
+            registration.register(schemaBuilder, converter);
+        }
+    }
+
+    private String convertDate(Object input) {
+        if (input instanceof LocalDate) {
+            return dateFormatter.format((LocalDate) input);
+        } else if (input instanceof Integer) {
+            LocalDate date = LocalDate.ofEpochDay((Integer) input);
+            return dateFormatter.format(date);
+        } else if (input instanceof Date){
+            return dateFormatter.format(((Date) input).toLocalDate());
+        }
+        return null;
+    }
+
+    private String convertTime(Object input) {
+        if (input instanceof String) {
+            return input.toString();
+        }
+        return null;
+    }
+
+    private String convertTimestamp(Object input) {
+        if (input instanceof Timestamp) {
+            return timestampFormatter.format(((Timestamp) 
input).toLocalDateTime());
+        } else if (input instanceof Instant){
+            LocalDateTime ldt =  LocalDateTime.ofInstant(((Instant) input), 
ZoneOffset.UTC);
+            return timestampFormatter.format(ldt);
+        }
+        return null;
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
new file mode 100644
index 0000000..de9d869
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.postgres;
+
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+
+import java.sql.DatabaseMetaData;
+
+public class PostgresSchema extends SourceSchema {
+
+    public PostgresSchema(DatabaseMetaData metaData, String databaseName, 
String schemaName, String tableName, String tableComment) throws Exception {
+        super(metaData, databaseName, schemaName,  tableName, tableComment);
+    }
+
+    @Override
+    public String convertToDorisType(String fieldType, Integer precision, 
Integer scale) {
+        return PostgresType.toDorisType(fieldType, precision, scale);
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java
new file mode 100644
index 0000000..87cbde2
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.postgres;
+
+import org.apache.doris.flink.catalog.doris.DorisType;
+import org.apache.flink.util.Preconditions;
+
+public class PostgresType {
+    private static final String INT2 = "int2";
+    private static final String SMALLSERIAL = "smallserial";
+    private static final String INT4 = "int4";
+    private static final String SERIAL = "serial";
+    private static final String INT8 = "int8";
+    private static final String BIGSERIAL = "bigserial";
+    private static final String NUMERIC = "numeric";
+    private static final String FLOAT4 = "float4";
+    private static final String FLOAT8 = "float8";
+    private static final String BPCHAR = "bpchar";
+    private static final String TIMESTAMP = "timestamp";
+    private static final String TIMESTAMPTZ = "timestamptz";
+    private static final String DATE = "date";
+    private static final String BOOL = "bool";
+    private static final String BIT = "bit";
+    private static final String POINT = "point";
+    private static final String LINE = "line";
+    private static final String LSEG = "lseg";
+    private static final String BOX = "box";
+    private static final String PATH = "path";
+    private static final String POLYGON = "polygon";
+    private static final String CIRCLE = "circle";
+    private static final String VARCHAR = "varchar";
+    private static final String TEXT = "text";
+    private static final String TIME = "time";
+    private static final String TIMETZ = "timetz";
+    private static final String INTERVAL = "interval";
+    private static final String CIDR = "cidr";
+    private static final String INET = "inet";
+    private static final String MACADDR = "macaddr";
+    private static final String VARBIT = "varbit";
+    private static final String UUID = "uuid";
+    private static final String BYTEA = "bytea";
+    private static final String JSON = "json";
+    private static final String JSONB = "jsonb";
+    private static final String _INT2 = "_int2";
+    private static final String _INT4 = "_int4";
+    private static final String _INT8 = "_int8";
+    private static final String _FLOAT4 = "_float4";
+    private static final String _FLOAT8 = "_float8";
+    private static final String _DATE = "_date";
+    private static final String _TIMESTAMP = "_timestamp";
+    private static final String _BOOL = "_bool";
+    private static final String _TEXT = "_text";
+
+    public static String toDorisType(String postgresType, Integer precision, 
Integer scale) {
+        postgresType = postgresType.toLowerCase();
+        switch (postgresType){
+            case INT2:
+            case SMALLSERIAL:
+                return DorisType.TINYINT;
+            case INT4:
+            case SERIAL:
+                return DorisType.INT;
+            case INT8:
+            case BIGSERIAL:
+                return DorisType.BIGINT;
+            case NUMERIC:
+                return precision != null && precision <= 38
+                        ? String.format("%s(%s,%s)", DorisType.DECIMAL_V3, 
precision, scale != null && scale >= 0 ? scale : 0)
+                        : DorisType.STRING;
+            case FLOAT4:
+                return DorisType.FLOAT;
+            case FLOAT8:
+                return DorisType.DOUBLE;
+            case BPCHAR:
+                Preconditions.checkNotNull(precision);
+                return String.format("%s(%s)", DorisType.CHAR, precision);
+            case TIMESTAMP:
+            case TIMESTAMPTZ:
+                return String.format("%s(%s)", DorisType.DATETIME_V2, 
Math.min(precision == null ? 0 : precision, 6));
+            case DATE:
+                return DorisType.DATE_V2;
+            case BOOL:
+                return DorisType.BOOLEAN;
+            case BIT:
+                return precision == 1 ? DorisType.BOOLEAN : DorisType.STRING;
+            case VARCHAR:
+                Preconditions.checkNotNull(precision);
+                return precision * 3 > 65533 ? DorisType.STRING : 
String.format("%s(%s)", DorisType.VARCHAR, precision * 3);
+            case POINT:
+            case LINE:
+            case LSEG:
+            case BOX:
+            case PATH:
+            case POLYGON:
+            case CIRCLE:
+            case TEXT:
+            case TIME:
+            case TIMETZ:
+            case INTERVAL:
+            case CIDR:
+            case INET:
+            case MACADDR:
+            case VARBIT:
+            case UUID:
+            case BYTEA:
+                return DorisType.STRING;
+            case JSON:
+            case JSONB:
+                return DorisType.JSONB;
+            case _BOOL:
+                return String.format("%s<%s>", DorisType.ARRAY, 
DorisType.BOOLEAN);
+            case _INT2:
+                return String.format("%s<%s>", DorisType.ARRAY, 
DorisType.TINYINT);
+            case _INT4:
+                return String.format("%s<%s>", DorisType.ARRAY, DorisType.INT);
+            case _INT8:
+                return String.format("%s<%s>", DorisType.ARRAY, 
DorisType.BIGINT);
+            case _FLOAT4:
+                return String.format("%s<%s>", DorisType.ARRAY, 
DorisType.FLOAT);
+            case _FLOAT8:
+                return String.format("%s<%s>", DorisType.ARRAY, 
DorisType.DOUBLE);
+            case _TEXT:
+                return String.format("%s<%s>", DorisType.ARRAY, 
DorisType.STRING);
+            case _DATE:
+                return String.format("%s<%s>", DorisType.ARRAY, 
DorisType.DATE_V2);
+            case _TIMESTAMP:
+                return String.format("%s<%s>", DorisType.ARRAY, 
DorisType.DATETIME_V2);
+            default:
+                throw new UnsupportedOperationException("Unsupported Postgres 
Type: " + postgresType);
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
new file mode 100644
index 0000000..4d5b485
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcPostgresSyncDatabaseCase {
+
+    public static void main(String[] args) throws Exception{
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.disableOperatorChaining();
+        env.enableCheckpointing(10000);
+
+//        Map<String,String> flinkMap = new HashMap<>();
+//        flinkMap.put("execution.checkpointing.interval","10s");
+//        flinkMap.put("pipeline.operator-chaining","false");
+//        flinkMap.put("parallelism.default","1");
+//
+//
+//        Configuration configuration = Configuration.fromMap(flinkMap);
+//        env.configure(configuration);
+
+        String database = "db2";
+        String tablePrefix = "";
+        String tableSuffix = "";
+        Map<String,String> sourceConfig = new HashMap<>();
+        sourceConfig.put("database-name","postgres");
+        sourceConfig.put("schema-name","public");
+        sourceConfig.put("slot.name","test");
+        sourceConfig.put("decoding.plugin.name","pgoutput");
+        sourceConfig.put("hostname","127.0.0.1");
+        sourceConfig.put("port","5432");
+        sourceConfig.put("username","postgres");
+        sourceConfig.put("password","123456");
+//        
sourceConfig.put("debezium.database.tablename.case.insensitive","false");
+//        sourceConfig.put("scan.incremental.snapshot.enabled","true");
+//        sourceConfig.put("debezium.include.schema.changes","false");
+        
+        Configuration config = Configuration.fromMap(sourceConfig);
+
+        Map<String,String> sinkConfig = new HashMap<>();
+        sinkConfig.put("fenodes","127.0.0.1:8737");
+        sinkConfig.put("username","root");
+        sinkConfig.put("password","");
+        sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9737");
+        sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+        Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+        Map<String,String> tableConfig = new HashMap<>();
+        tableConfig.put("replication_num", "1");
+
+        String includingTables = "testcdc";
+        String excludingTables = "";
+        boolean ignoreDefaultValue = false;
+        boolean useNewSchemaChange = false;
+        DatabaseSync databaseSync = new PostgresDatabaseSync();
+        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
+        databaseSync.build();
+        env.execute(String.format("Postgres-Doris Database Sync: %s", 
database));
+
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to