This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8d5cbcee74 [Feature][Jdbc] Support redshift catalog (#6992)
8d5cbcee74 is described below

commit 8d5cbcee7441cb41aad59d6e77dcf90212af4c26
Author: hailin0 <[email protected]>
AuthorDate: Sat Jun 15 13:45:31 2024 +0800

    [Feature][Jdbc] Support redshift catalog (#6992)
---
 docs/en/connector-v2/sink/Redshift.md              |  99 ++++++++++
 docs/en/connector-v2/source/Redshift.md            | 133 +++++++++++++
 .../jdbc/catalog/redshift/RedshiftCatalog.java     | 213 +++++++++++++++++++++
 .../catalog/redshift/RedshiftCatalogFactory.java   |  69 +++++++
 .../redshift/RedshiftCreateTableSqlBuilder.java    | 134 +++++++++++++
 .../redshift/RedshiftDataTypeConvertor.java        | 179 ++++-------------
 .../internal/dialect/redshift/RedshiftDialect.java | 130 +++++++++++++
 .../jdbc/catalog/redshift/RedshiftCatalogTest.java | 102 ++++++++++
 8 files changed, 920 insertions(+), 139 deletions(-)

diff --git a/docs/en/connector-v2/sink/Redshift.md 
b/docs/en/connector-v2/sink/Redshift.md
new file mode 100644
index 0000000000..90f312fab9
--- /dev/null
+++ b/docs/en/connector-v2/sink/Redshift.md
@@ -0,0 +1,99 @@
+# Redshift
+
+> JDBC Redshift sink Connector
+
+## Support those engines
+
+> Spark<br/>
+> Flink<br/>
+> Seatunnel Zeta<br/>
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+> Use `Xa transactions` to ensure `exactly-once`. So only support 
`exactly-once` for the database which is
+> support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
+
+## Description
+
+Write data through jdbc. Support Batch mode and Streaming mode, support 
concurrent writing, support exactly-once
+semantics (using XA transaction guarantee).
+
+## Supported DataSource list
+
+| datasource |                    supported versions                    |      
       driver              |                   url                   |          
                             maven                                        |
+|------------|----------------------------------------------------------|---------------------------------|-----------------------------------------|------------------------------------------------------------------------------------|
+| redshift   | Different dependency version has different driver class. | 
com.amazon.redshift.jdbc.Driver | jdbc:redshift://localhost:5439/database | 
[Download](https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42)
 |
+
+## Database dependency
+
+### For Spark/Flink Engine
+
+> 1. You need to ensure that the [jdbc driver jar 
package](https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42)
 has been placed in directory `${SEATUNNEL_HOME}/plugins/`.
+
+### For SeaTunnel Zeta Engine
+
+> 1. You need to ensure that the [jdbc driver jar 
package](https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42)
 has been placed in directory `${SEATUNNEL_HOME}/lib/`.
+
+## Data Type Mapping
+
+|   SeaTunnel Data type   | Redshift Data type |
+|-------------------------|--------------------|
+| BOOLEAN                 | BOOLEAN            |
+| TINYINT<br/> SMALLINT   | SMALLINT           |
+| INT                     | INTEGER            |
+| BIGINT                  | BIGINT             |
+| FLOAT                   | REAL               |
+| DOUBLE                  | DOUBLE PRECISION   |
+| DECIMAL                 | NUMERIC            |
+| STRING(<=65535)         | CHARACTER VARYING  |
+| STRING(>65535)          | SUPER              |
+| BYTES                   | BINARY VARYING     |
+| TIME                    | TIME               |
+| TIMESTAMP               | TIMESTAMP          |
+| MAP<br/> ARRAY<br/> ROW | SUPER              |
+
+## Task Example
+
+### Simple:
+
+```
+sink {
+    jdbc {
+        url = "jdbc:redshift://localhost:5439/mydatabase"
+        driver = "com.amazon.redshift.jdbc.Driver"
+        user = "myUser"
+        password = "myPassword"
+        
+        generate_sink_sql = true
+        schema = "public"
+        table = "sink_table"
+    }
+}
+```
+
+### CDC(Change data capture) event
+
+> CDC change data is also supported by us In this case, you need config 
database, table and primary_keys.
+
+```
+sink {
+    jdbc {
+        url = "jdbc:redshift://localhost:5439/mydatabase"
+        driver = "com.amazon.redshift.jdbc.Driver"
+        user = "myUser"
+        password = "mypassword"
+        
+        generate_sink_sql = true
+        schema = "public"
+        table = "sink_table"
+        
+        # config update/delete primary keys
+        primary_keys = ["id","name"]
+    }
+}
+```
+
diff --git a/docs/en/connector-v2/source/Redshift.md 
b/docs/en/connector-v2/source/Redshift.md
new file mode 100644
index 0000000000..8da5ea9391
--- /dev/null
+++ b/docs/en/connector-v2/source/Redshift.md
@@ -0,0 +1,133 @@
+# Redshift
+
+> JDBC Redshift Source Connector
+
+## Description
+
+Read external data source data through JDBC.
+
+## Support those engines
+
+> Spark<br/>
+> Flink<br/>
+> Seatunnel Zeta<br/>
+
+### For Spark/Flink Engine
+
+> 1. You need to ensure that the [jdbc driver jar 
package](https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42)
 has been placed in directory `${SEATUNNEL_HOME}/plugins/`.
+
+### For SeaTunnel Zeta Engine
+
+> 1. You need to ensure that the [jdbc driver jar 
package](https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42)
 has been placed in directory `${SEATUNNEL_HOME}/lib/`.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+> supports query SQL and can achieve projection effect.
+
+## Supported DataSource list
+
+| datasource |                    supported versions                    |      
       driver              |                   url                   |          
                             maven                                        |
+|------------|----------------------------------------------------------|---------------------------------|-----------------------------------------|------------------------------------------------------------------------------------|
+| redshift   | Different dependency version has different driver class. | 
com.amazon.redshift.jdbc.Driver | jdbc:redshift://localhost:5439/database | 
[Download](https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42)
 |
+
+## Database dependency
+
+> Please download the support list corresponding to 'Maven' and copy it to the 
'$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory<br/>
+> For example Redshift datasource: cp RedshiftJDBC42-xxx.jar 
$SEATNUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+|                                                Redshift Data type            
                                     |                                          
                       Seatunnel Data type                                      
                           |
+|-------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
+| SMALLINT<br />INT2                                                           
                                     | SHORT                                    
                                                                                
                           |
+| INTEGER<br />INT<br />INT4                                                   
                                     | INT                                      
                                                                                
                           |
+| BIGINT<br />INT8<br />OID                                                    
                                     | LONG                                     
                                                                                
                           |
+| DECIMAL<br />NUMERIC                                                         
                                     | DECIMAL((Get the designated column's 
specified column size)+1,<br/>(Gets the designated column's number of digits to 
right of the decimal point.))) |
+| REAL<br />FLOAT4                                                             
                                     | FLOAT                                    
                                                                                
                           |
+| DOUBLE_PRECISION<br />FLOAT8<br />FLOAT                                      
                                     | DOUBLE                                   
                                                                                
                           |
+| BOOLEAN<br />BOOL                                                            
                                     | BOOLEAN                                  
                                                                                
                           |
+| CHAR<br />CHARACTER<br />NCHAR<br />BPCHAR<br />VARCHAR<br 
/>CHARACTER_VARYING<br />NVARCHAR<br />TEXT<br />SUPER | STRING                 
                                                                                
                                             |
+| VARBYTE<br />BINARY_VARYING                                                  
                                     | BYTES                                    
                                                                                
                           |
+| TIME<br />TIME_WITH_TIME_ZONE<br />TIMETZ                                    
                                     | LOCALTIME                                
                                                                                
                           |
+| TIMESTAMP<br />TIMESTAMP_WITH_OUT_TIME_ZONE<br />TIMESTAMPTZ                 
                                     | LOCALDATETIME                            
                                                                                
                           |
+
+## Example
+
+### Simple:
+
+> This example queries type_bin 'table' 16 data in your test "database" in 
single parallel and queries all of its fields. You can also specify which 
fields to query for final output to the console.
+
+```
+env {
+  parallelism = 2
+  job.mode = "BATCH"
+}
+source{
+    Jdbc {
+        url = "jdbc:redshift://localhost:5439/dev"
+        driver = "com.amazon.redshift.jdbc.Driver"
+        user = "root"
+        password = "123456"
+        
+        table_path = "public.table2"
+        # Use query filetr rows & columns
+        query = "select id, name from public.table2 where id > 100"
+        
+        #split.size = 8096
+        #split.even-distribution.factor.upper-bound = 100
+        #split.even-distribution.factor.lower-bound = 0.05
+        #split.sample-sharding.threshold = 1000
+        #split.inverse-sampling.rate = 1000
+    }
+}
+
+sink {
+    Console {}
+}
+```
+
+### Multiple table read:
+
+***Configuring `table_list` will turn on auto split, you can configure 
`split.*` to adjust the split strategy***
+
+```hocon
+env {
+  job.mode = "BATCH"
+  parallelism = 2
+}
+source {
+  Jdbc {
+    url = "jdbc:redshift://localhost:5439/dev"
+    driver = "com.amazon.redshift.jdbc.Driver"
+    user = "root"
+    password = "123456"
+
+    table_list = [
+      {
+        table_path = "public.table1"
+      },
+      {
+        table_path = "public.table2"
+        # Use query filetr rows & columns
+        query = "select id, name from public.table2 where id > 100"
+      }
+    ]
+    #split.size = 8096
+    #split.even-distribution.factor.upper-bound = 100
+    #split.even-distribution.factor.lower-bound = 0.05
+    #split.sample-sharding.threshold = 1000
+    #split.inverse-sampling.rate = 1000
+  }
+}
+
+sink {
+  Console {}
+}
+```
+
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
new file mode 100644
index 0000000000..7b29bbb8ea
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.redshift;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftTypeConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftTypeMapper;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+public class RedshiftCatalog extends AbstractJdbcCatalog {
+
+    protected static final Set<String> EXCLUDED_SCHEMAS = new HashSet<>(4);
+
+    private final String SELECT_COLUMNS =
+            "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 
'%s' AND TABLE_NAME ='%s' ORDER BY ordinal_position ASC";
+
+    static {
+        EXCLUDED_SCHEMAS.add("information_schema");
+        EXCLUDED_SCHEMAS.add("catalog_history");
+        EXCLUDED_SCHEMAS.add("pg_auto_copy");
+        EXCLUDED_SCHEMAS.add("pg_automv");
+        EXCLUDED_SCHEMAS.add("pg_catalog");
+        EXCLUDED_SCHEMAS.add("pg_internal");
+        EXCLUDED_SCHEMAS.add("pg_s3");
+    }
+
+    static {
+        SYS_DATABASES.add("template0");
+        SYS_DATABASES.add("template1");
+        SYS_DATABASES.add("awsdatacatalog");
+        SYS_DATABASES.add("padb_harvest");
+    }
+
+    protected final Map<String, Connection> connectionMap;
+
+    public RedshiftCatalog(
+            String catalogName,
+            String username,
+            String pwd,
+            JdbcUrlUtil.UrlInfo urlInfo,
+            String schema) {
+        super(catalogName, username, pwd, urlInfo, schema);
+        this.connectionMap = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        for (Map.Entry<String, Connection> entry : connectionMap.entrySet()) {
+            try {
+                entry.getValue().close();
+            } catch (SQLException e) {
+                throw new CatalogException(
+                        String.format("Failed to close %s via JDBC.", 
entry.getKey()), e);
+            }
+        }
+        super.close();
+    }
+
+    @Override
+    protected String getListDatabaseSql() {
+        return "select datname from pg_database;";
+    }
+
+    @Override
+    protected String getListTableSql(String databaseName) {
+        return "SELECT table_schema, table_name FROM 
information_schema.tables;";
+    }
+
+    @Override
+    protected String getTableName(ResultSet rs) throws SQLException {
+        StringBuilder stringBuilder = new StringBuilder();
+        return stringBuilder
+                .append(rs.getString(1))
+                .append(".")
+                .append(rs.getString(2))
+                .toString()
+                .toLowerCase();
+    }
+
+    @Override
+    protected String getCreateTableSql(TablePath tablePath, CatalogTable 
table) {
+        String createTableSql =
+                new RedshiftCreateTableSqlBuilder(table)
+                        .build(tablePath, table.getOptions().get("fieldIde"));
+        return CatalogUtils.getFieldIde(createTableSql, 
table.getOptions().get("fieldIde"));
+    }
+
+    @Override
+    protected String getDropTableSql(TablePath tablePath) {
+        return String.format(
+                "DROP TABLE %s;", tablePath.getSchemaName() + "." + 
tablePath.getTableName());
+    }
+
+    @Override
+    protected String getTruncateTableSql(TablePath tablePath) {
+        return String.format(
+                "TRUNCATE TABLE %s;", tablePath.getSchemaName() + "." + 
tablePath.getTableName());
+    }
+
+    @Override
+    protected String getCreateDatabaseSql(String databaseName) {
+        return String.format("CREATE DATABASE `%s`;", databaseName);
+    }
+
+    @Override
+    protected String getDropDatabaseSql(String databaseName) {
+        return String.format("DROP DATABASE `%s`;", databaseName);
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        try {
+            if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
+                return databaseExists(tablePath.getDatabaseName())
+                        && listTables(tablePath.getDatabaseName())
+                                
.contains(tablePath.getSchemaAndTableName().toLowerCase());
+            }
+            return listTables(defaultDatabase)
+                    .contains(tablePath.getSchemaAndTableName().toLowerCase());
+        } catch (DatabaseNotExistException e) {
+            return false;
+        }
+    }
+
+    @Override
+    protected String getSelectColumnsSql(TablePath tablePath) {
+        return String.format(SELECT_COLUMNS, tablePath.getSchemaName(), 
tablePath.getTableName());
+    }
+
+    @Override
+    protected TableIdentifier getTableIdentifier(TablePath tablePath) {
+        return TableIdentifier.of(
+                catalogName,
+                tablePath.getDatabaseName(),
+                tablePath.getSchemaName(),
+                tablePath.getTableName());
+    }
+
+    @Override
+    protected Column buildColumn(ResultSet resultSet) throws SQLException {
+        String columnName = resultSet.getString("COLUMN_NAME");
+        String typeName = resultSet.getString("DATA_TYPE").toUpperCase();
+        long precision = resultSet.getLong("NUMERIC_PRECISION");
+        int scale = resultSet.getInt("NUMERIC_SCALE");
+        long columnLength = resultSet.getLong("CHARACTER_MAXIMUM_LENGTH");
+        Object defaultValue = resultSet.getObject("COLUMN_DEFAULT");
+        String isNullableStr = resultSet.getString("IS_NULLABLE");
+        boolean isNullable = isNullableStr.equals("YES");
+
+        BasicTypeDefine typeDefine =
+                BasicTypeDefine.builder()
+                        .name(columnName)
+                        .columnType(typeName)
+                        .dataType(typeName)
+                        .length(columnLength)
+                        .precision(precision)
+                        .scale(scale)
+                        .nullable(isNullable)
+                        .defaultValue(defaultValue)
+                        .build();
+        return RedshiftTypeConverter.INSTANCE.convert(typeDefine);
+    }
+
+    @Override
+    public String getExistDataSql(TablePath tablePath) {
+        return String.format("select * from %s LIMIT 1;", 
tablePath.getFullName());
+    }
+
+    @Override
+    public CatalogTable getTable(String sqlQuery) throws SQLException {
+        return CatalogUtils.getCatalogTable(
+                getConnection(getUrlFromDatabaseName(defaultDatabase)),
+                sqlQuery,
+                new RedshiftTypeMapper());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalogFactory.java
new file mode 100644
index 0000000000..31409b3b21
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalogFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.redshift;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Optional;
+
+@AutoService(Factory.class)
+public class RedshiftCatalogFactory implements CatalogFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return DatabaseIdentifier.REDSHIFT;
+    }
+
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
+        Preconditions.checkArgument(
+                StringUtils.isNotBlank(urlWithDatabase),
+                "Miss config <base-url>! Please check your config.");
+        JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
+        Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
+        if (!defaultDatabase.isPresent()) {
+            throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
+        }
+        return new RedshiftCatalog(
+                catalogName,
+                options.get(JdbcCatalogOptions.USERNAME),
+                options.get(JdbcCatalogOptions.PASSWORD),
+                urlInfo,
+                options.get(JdbcCatalogOptions.SCHEMA));
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return JdbcCatalogOptions.BASE_RULE.build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCreateTableSqlBuilder.java
new file mode 100644
index 0000000000..01e6439710
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCreateTableSqlBuilder.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.redshift;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftTypeConverter;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class RedshiftCreateTableSqlBuilder {
+    private List<Column> columns;
+    private PrimaryKey primaryKey;
+    private String sourceCatalogName;
+
+    public RedshiftCreateTableSqlBuilder(CatalogTable catalogTable) {
+        this.columns = catalogTable.getTableSchema().getColumns();
+        this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
+        this.sourceCatalogName = catalogTable.getCatalogName();
+    }
+
+    public String build(TablePath tablePath) {
+        return build(tablePath, "");
+    }
+
+    public String build(TablePath tablePath, String fieldIde) {
+        StringBuilder createTableSql = new StringBuilder();
+        createTableSql
+                .append(CatalogUtils.quoteIdentifier("CREATE TABLE ", 
fieldIde))
+                .append(tablePath.getSchemaAndTableName("\""))
+                .append(" (\n");
+
+        List<String> columnSqls =
+                columns.stream()
+                        .map(
+                                column ->
+                                        CatalogUtils.quoteIdentifier(
+                                                buildColumnSql(column), 
fieldIde))
+                        .collect(Collectors.toList());
+
+        if (primaryKey != null && primaryKey.getColumnNames().size() > 1) {
+            columnSqls.add(
+                    CatalogUtils.quoteIdentifier(
+                            "PRIMARY KEY ("
+                                    + primaryKey.getColumnNames().stream()
+                                            .map(column -> "\"" + column + 
"\"")
+                                            .collect(Collectors.joining(","))
+                                    + ")",
+                            fieldIde));
+        }
+        createTableSql.append(String.join(",\n", columnSqls));
+        createTableSql.append("\n);");
+
+        List<String> commentSqls =
+                columns.stream()
+                        .filter(column -> 
StringUtils.isNotBlank(column.getComment()))
+                        .map(
+                                columns ->
+                                        buildColumnCommentSql(
+                                                columns,
+                                                
tablePath.getSchemaAndTableName("\""),
+                                                fieldIde))
+                        .collect(Collectors.toList());
+
+        if (!commentSqls.isEmpty()) {
+            createTableSql.append("\n");
+            createTableSql.append(String.join(";\n", commentSqls)).append(";");
+        }
+
+        return createTableSql.toString();
+    }
+
+    private String buildColumnSql(Column column) {
+        StringBuilder columnSql = new StringBuilder();
+        columnSql.append("\"").append(column.getName()).append("\" ");
+        String columnType =
+                (StringUtils.equals(sourceCatalogName, 
DatabaseIdentifier.REDSHIFT)
+                                        || StringUtils.equals(
+                                                sourceCatalogName, 
DatabaseIdentifier.POSTGRESQL))
+                                && 
StringUtils.isNotBlank(column.getSourceType())
+                        ? column.getSourceType()
+                        : 
RedshiftTypeConverter.INSTANCE.reconvert(column).getColumnType();
+        columnSql.append(columnType);
+
+        if (!column.isNullable()) {
+            columnSql.append(" NOT NULL");
+        }
+
+        if (primaryKey != null
+                && primaryKey.getColumnNames().contains(column.getName())
+                && primaryKey.getColumnNames().size() == 1) {
+            columnSql.append(" PRIMARY KEY");
+        }
+
+        return columnSql.toString();
+    }
+
+    private String buildColumnCommentSql(Column column, String tableName, 
String fieldIde) {
+        StringBuilder columnCommentSql = new StringBuilder();
+        columnCommentSql
+                .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", 
fieldIde))
+                .append(tableName)
+                .append(".");
+        columnCommentSql
+                .append(CatalogUtils.quoteIdentifier(column.getName(), 
fieldIde, "\""))
+                .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde))
+                .append(column.getComment())
+                .append("'");
+        return columnCommentSql.toString();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
index a16944a250..330ea6b28d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
@@ -18,15 +18,13 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.redshift;
 
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SqlType;
-import org.apache.seatunnel.common.exception.CommonError;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftTypeConverter;
 
 import org.apache.commons.collections4.MapUtils;
 
@@ -37,6 +35,8 @@ import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+/** @deprecated instead by {@link RedshiftTypeConverter} */
+@Deprecated
 @AutoService(DataTypeConvertor.class)
 public class RedshiftDataTypeConvertor implements DataTypeConvertor<String> {
 
@@ -44,56 +44,8 @@ public class RedshiftDataTypeConvertor implements 
DataTypeConvertor<String> {
     public static final String SCALE = "scale";
 
     public static final Integer DEFAULT_PRECISION = 10;
-
     public static final Integer DEFAULT_SCALE = 0;
 
-    /* ============================ data types ===================== */
-    private static final String REDSHIFT_SMALLINT = "smallint";
-    private static final String REDSHIFT_INT2 = "int2";
-    private static final String REDSHIFT_INTEGER = "integer";
-    private static final String REDSHIFT_INT = "int";
-    private static final String REDSHIFT_INT4 = "int4";
-    private static final String REDSHIFT_BIGINT = "bigint";
-    private static final String REDSHIFT_INT8 = "int8";
-
-    private static final String REDSHIFT_DECIMAL = "decimal";
-    private static final String REDSHIFT_NUMERIC = "numeric";
-    private static final String REDSHIFT_REAL = "real";
-    private static final String REDSHIFT_FLOAT4 = "float4";
-    private static final String REDSHIFT_DOUBLE_PRECISION = "double precision";
-    private static final String REDSHIFT_FLOAT8 = "float8";
-    private static final String REDSHIFT_FLOAT = "float";
-
-    private static final String REDSHIFT_BOOLEAN = "boolean";
-    private static final String REDSHIFT_BOOL = "bool";
-
-    private static final String REDSHIFT_CHAR = "char";
-    private static final String REDSHIFT_CHARACTER = "character";
-    private static final String REDSHIFT_NCHAR = "nchar";
-    private static final String REDSHIFT_BPCHAR = "bpchar";
-
-    private static final String REDSHIFT_VARCHAR = "varchar";
-    private static final String REDSHIFT_CHARACTER_VARYING = "character 
varying";
-    private static final String REDSHIFT_NVARCHAR = "nvarchar";
-    private static final String REDSHIFT_TEXT = "text";
-
-    private static final String REDSHIFT_DATE = "date";
-    /*FIXME*/
-
-    private static final String REDSHIFT_GEOMETRY = "geometry";
-    private static final String REDSHIFT_OID = "oid";
-    private static final String REDSHIFT_SUPER = "super";
-
-    private static final String REDSHIFT_TIME = "time";
-    private static final String REDSHIFT_TIME_WITH_TIME_ZONE = "time with time 
zone";
-
-    private static final String REDSHIFT_TIMETZ = "timetz";
-    private static final String REDSHIFT_TIMESTAMP = "timestamp";
-    private static final String REDSHIFT_TIMESTAMP_WITH_OUT_TIME_ZONE =
-            "timestamp without time zone";
-
-    private static final String REDSHIFT_TIMESTAMPTZ = "timestamptz";
-
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(String field, String 
connectorDataType) {
         return toSeaTunnelType(field, connectorDataType, 
Collections.emptyMap());
@@ -103,60 +55,29 @@ public class RedshiftDataTypeConvertor implements 
DataTypeConvertor<String> {
     public SeaTunnelDataType<?> toSeaTunnelType(
             String field, String connectorDataType, Map<String, Object> 
dataTypeProperties) {
         checkNotNull(connectorDataType, "redshiftType cannot be null");
-        switch (connectorDataType) {
-            case REDSHIFT_SMALLINT:
-            case REDSHIFT_INT2:
-                return BasicType.SHORT_TYPE;
-            case REDSHIFT_INTEGER:
-            case REDSHIFT_INT:
-            case REDSHIFT_INT4:
-                return BasicType.INT_TYPE;
-            case REDSHIFT_BIGINT:
-            case REDSHIFT_INT8:
-            case REDSHIFT_OID:
-                return BasicType.LONG_TYPE;
-            case REDSHIFT_DECIMAL:
-            case REDSHIFT_NUMERIC:
-                Integer precision =
-                        MapUtils.getInteger(dataTypeProperties, PRECISION, 
DEFAULT_PRECISION);
-                Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE, 
DEFAULT_SCALE);
-                return new DecimalType(precision, scale);
-            case REDSHIFT_REAL:
-            case REDSHIFT_FLOAT4:
-                return BasicType.FLOAT_TYPE;
-            case REDSHIFT_DOUBLE_PRECISION:
-            case REDSHIFT_FLOAT8:
-            case REDSHIFT_FLOAT:
-                return BasicType.DOUBLE_TYPE;
-            case REDSHIFT_BOOLEAN:
-            case REDSHIFT_BOOL:
-                return BasicType.BOOLEAN_TYPE;
-            case REDSHIFT_CHAR:
-            case REDSHIFT_CHARACTER:
-            case REDSHIFT_NCHAR:
-            case REDSHIFT_BPCHAR:
-            case REDSHIFT_VARCHAR:
-            case REDSHIFT_CHARACTER_VARYING:
-            case REDSHIFT_NVARCHAR:
-            case REDSHIFT_TEXT:
-            case REDSHIFT_SUPER:
-                return BasicType.STRING_TYPE;
-            case REDSHIFT_DATE:
-                return LocalTimeType.LOCAL_DATE_TYPE;
-            case REDSHIFT_GEOMETRY:
-                return PrimitiveByteArrayType.INSTANCE;
-            case REDSHIFT_TIME:
-            case REDSHIFT_TIME_WITH_TIME_ZONE:
-            case REDSHIFT_TIMETZ:
-                return LocalTimeType.LOCAL_TIME_TYPE;
-            case REDSHIFT_TIMESTAMP:
-            case REDSHIFT_TIMESTAMP_WITH_OUT_TIME_ZONE:
-            case REDSHIFT_TIMESTAMPTZ:
-                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+
+        Integer precision = null;
+        Integer scale = null;
+        switch (connectorDataType.toUpperCase()) {
+            case RedshiftTypeConverter.PG_NUMERIC:
+                precision = MapUtils.getInteger(dataTypeProperties, PRECISION, 
DEFAULT_PRECISION);
+                scale = MapUtils.getInteger(dataTypeProperties, SCALE, 
DEFAULT_SCALE);
+                break;
             default:
-                throw CommonError.convertToSeaTunnelTypeError(
-                        DatabaseIdentifier.REDSHIFT, connectorDataType, field);
+                break;
         }
+
+        BasicTypeDefine typeDefine =
+                BasicTypeDefine.builder()
+                        .name(field)
+                        .columnType(connectorDataType)
+                        .dataType(connectorDataType)
+                        .length(precision == null ? null : 
Long.valueOf(precision))
+                        .precision(precision == null ? null : 
Long.valueOf(precision))
+                        .scale(scale)
+                        .build();
+
+        return 
RedshiftTypeConverter.INSTANCE.convert(typeDefine).getDataType();
     }
 
     @Override
@@ -165,39 +86,19 @@ public class RedshiftDataTypeConvertor implements 
DataTypeConvertor<String> {
             SeaTunnelDataType<?> seaTunnelDataType,
             Map<String, Object> dataTypeProperties) {
         checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
-        SqlType sqlType = seaTunnelDataType.getSqlType();
-        switch (sqlType) {
-            case TINYINT:
-            case SMALLINT:
-                return REDSHIFT_SMALLINT;
-            case INT:
-                return REDSHIFT_INTEGER;
-            case BIGINT:
-                return REDSHIFT_BIGINT;
-            case DECIMAL:
-                return REDSHIFT_DECIMAL;
-            case FLOAT:
-                return REDSHIFT_FLOAT4;
-            case DOUBLE:
-                return REDSHIFT_DOUBLE_PRECISION;
-            case BOOLEAN:
-                return REDSHIFT_BOOLEAN;
-            case STRING:
-                return REDSHIFT_TEXT;
-            case DATE:
-                return REDSHIFT_DATE;
-            case BYTES:
-                return REDSHIFT_GEOMETRY;
-            case TIME:
-                return REDSHIFT_TIME;
-            case TIMESTAMP:
-                return REDSHIFT_TIMESTAMP;
-            default:
-                throw CommonError.convertToConnectorTypeError(
-                        DatabaseIdentifier.REDSHIFT,
-                        seaTunnelDataType.getSqlType().toString(),
-                        field);
-        }
+
+        Long precision = MapUtils.getLong(dataTypeProperties, PRECISION);
+        Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE);
+        Column column =
+                PhysicalColumn.builder()
+                        .name(field)
+                        .dataType(seaTunnelDataType)
+                        .columnLength(precision)
+                        .scale(scale)
+                        .nullable(true)
+                        .build();
+        BasicTypeDefine typeDefine = 
RedshiftTypeConverter.INSTANCE.reconvert(column);
+        return typeDefine.getColumnType();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialect.java
index 11538ded44..d310e8412e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialect.java
@@ -17,14 +17,34 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift;
 
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.SQLUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
 
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Optional;
 
 public class RedshiftDialect implements JdbcDialect {
+    public static final int DEFAULT_POSTGRES_FETCH_SIZE = 128;
+    public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+    public RedshiftDialect() {}
+
+    public RedshiftDialect(String fieldIde) {
+        this.fieldIde = fieldIde;
+    }
+
     @Override
     public String dialectName() {
         return DatabaseIdentifier.REDSHIFT;
@@ -45,4 +65,114 @@ public class RedshiftDialect implements JdbcDialect {
             String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
         return Optional.empty();
     }
+
+    @Override
+    public PreparedStatement creatPreparedStatement(
+            Connection connection, String queryTemplate, int fetchSize) throws 
SQLException {
+        // use cursor mode, reference:
+        connection.setAutoCommit(false);
+        PreparedStatement statement =
+                connection.prepareStatement(
+                        queryTemplate, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+        if (fetchSize > 0) {
+            statement.setFetchSize(fetchSize);
+        } else {
+            statement.setFetchSize(DEFAULT_POSTGRES_FETCH_SIZE);
+        }
+        return statement;
+    }
+
+    @Override
+    public String tableIdentifier(String database, String tableName) {
+        return quoteDatabaseIdentifier(database) + "." + 
quoteIdentifier(tableName);
+    }
+
+    @Override
+    public String tableIdentifier(TablePath tablePath) {
+        return tablePath.getFullNameWithQuoted("\"");
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        if (identifier.contains(".")) {
+            String[] parts = identifier.split("\\.");
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < parts.length - 1; i++) {
+                sb.append("\"").append(parts[i]).append("\"").append(".");
+            }
+            return sb.append("\"")
+                    .append(getFieldIde(parts[parts.length - 1], fieldIde))
+                    .append("\"")
+                    .toString();
+        }
+
+        return "\"" + getFieldIde(identifier, fieldIde) + "\"";
+    }
+
+    @Override
+    public String quoteDatabaseIdentifier(String identifier) {
+        return "\"" + identifier + "\"";
+    }
+
+    @Override
+    public TablePath parse(String tablePath) {
+        return TablePath.of(tablePath, true);
+    }
+
+    @Override
+    public String hashModForField(String nativeType, String fieldName, int 
mod) {
+        String quoteFieldName = quoteIdentifier(fieldName);
+        if (StringUtils.isNotBlank(nativeType)) {
+            quoteFieldName = convertType(quoteFieldName, nativeType);
+        }
+        return "(ABS(MURMUR3_32_HASH(" + quoteFieldName + ")) % " + mod + ")";
+    }
+
+    @Override
+    public String hashModForField(String fieldName, int mod) {
+        return hashModForField(null, fieldName, mod);
+    }
+
+    @Override
+    public Long approximateRowCntStatement(Connection connection, 
JdbcSourceTable table)
+            throws SQLException {
+
+        // 1. If no query is configured, use TABLE STATUS.
+        // 2. If a query is configured but does not contain a WHERE clause and 
tablePath is
+        // configured, use TABLE STATUS.
+        // 3. If a query is configured with a WHERE clause, or a query 
statement is configured but
+        // tablePath is TablePath.DEFAULT, use COUNT(*).
+
+        boolean useTableStats =
+                StringUtils.isBlank(table.getQuery())
+                        || (!table.getQuery().toLowerCase().contains("where")
+                                && table.getTablePath() != null
+                                && !TablePath.DEFAULT
+                                        .getFullName()
+                                        
.equals(table.getTablePath().getFullName()));
+        if (useTableStats) {
+            String rowCountQuery =
+                    String.format(
+                            "SELECT reltuples FROM pg_class r WHERE relkind = 
'r' AND relname = '%s';",
+                            table.getTablePath().getTableName());
+            try (Statement stmt = connection.createStatement()) {
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
rowCountQuery);
+                try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
+                    if (!rs.next()) {
+                        throw new SQLException(
+                                String.format(
+                                        "No result returned after running 
query [%s]",
+                                        rowCountQuery));
+                    }
+                    return rs.getLong(1);
+                }
+            } catch (SQLException e) {
+                log.warn(
+                        "Failed to get approximate row count from table 
status, fallback to count rows",
+                        e);
+                return SQLUtils.countForTable(connection, 
tableIdentifier(table.getTablePath()));
+            }
+        }
+        return SQLUtils.countForSubquery(connection, table.getQuery());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalogTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalogTest.java
new file mode 100644
index 0000000000..27439ec1ed
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalogTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.redshift;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+
+public class RedshiftCatalogTest {
+
+    private static final CatalogTable CATALOG_TABLE =
+            CatalogTable.of(
+                    TableIdentifier.of("catalog", "database", "table"),
+                    TableSchema.builder()
+                            .columns(
+                                    Arrays.asList(
+                                            PhysicalColumn.of(
+                                                    "test",
+                                                    BasicType.STRING_TYPE,
+                                                    (Long) null,
+                                                    true,
+                                                    null,
+                                                    ""),
+                                            PhysicalColumn.of(
+                                                    "test2",
+                                                    BasicType.STRING_TYPE,
+                                                    (Long) null,
+                                                    true,
+                                                    null,
+                                                    ""),
+                                            PhysicalColumn.of(
+                                                    "test3",
+                                                    BasicType.STRING_TYPE,
+                                                    (Long) null,
+                                                    true,
+                                                    null,
+                                                    "")))
+                            .primaryKey(
+                                    new PrimaryKey(
+                                            "test_primary_keys", 
Arrays.asList("test", "test2")))
+                            .build(),
+                    Collections.emptyMap(),
+                    Collections.emptyList(),
+                    "comment");
+
+    @Test
+    void testCreateTableSqlWithPrimaryKeys() {
+        RedshiftCatalogFactory factory = new RedshiftCatalogFactory();
+        RedshiftCatalog catalog =
+                (RedshiftCatalog)
+                        factory.createCatalog(
+                                "test",
+                                ReadonlyConfig.fromMap(
+                                        new HashMap<String, Object>() {
+                                            {
+                                                put(
+                                                        "base-url",
+                                                        
"jdbc:redshift://localhost:5432/test");
+                                                put("username", "test");
+                                                put("password", "test");
+                                            }
+                                        }));
+        String sql = catalog.getCreateTableSql(TablePath.of("test.test.test"), 
CATALOG_TABLE);
+        Assertions.assertEquals(
+                "CREATE TABLE \"test\".\"test\" (\n"
+                        + "\"test\" CHARACTER VARYING(65535),\n"
+                        + "\"test2\" CHARACTER VARYING(65535),\n"
+                        + "\"test3\" CHARACTER VARYING(65535),\n"
+                        + "PRIMARY KEY (\"test\",\"test2\")\n"
+                        + ");",
+                sql);
+    }
+}

Reply via email to