This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e6b4f9872 [Feature][Connector][Jdbc] Add vertica connector. (#4303)
e6b4f9872 is described below

commit e6b4f98721100454c709d55c77c63ff8543268a3
Author: FlechazoW <[email protected]>
AuthorDate: Mon Apr 17 10:53:33 2023 +0800

    [Feature][Connector][Jdbc] Add vertica connector. (#4303)
    
    * [Feature][Connector]Jdbc] Add vertica connector.
---
 docs/en/connector-v2/sink/Jdbc.md                  |   4 +-
 docs/en/connector-v2/source/Jdbc.md                |   2 +
 release-note.md                                    |   1 +
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |  12 ++
 .../internal/dialect/vertica/VerticaDialect.java   | 109 +++++++++++++
 .../dialect/vertica/VerticaDialectFactory.java     |  37 +++++
 .../dialect/vertica/VerticaJdbcRowConverter.java   |  27 ++++
 .../dialect/vertica/VerticaTypeMapper.java         | 177 +++++++++++++++++++++
 .../connector-jdbc-e2e-part-3/pom.xml              |   5 +
 .../connectors/seatunnel/jdbc/JdbcVerticaIT.java   | 138 ++++++++++++++++
 .../resources/jdbc_vertica_source_and_sink.conf    |  45 ++++++
 11 files changed, 556 insertions(+), 1 deletion(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index f2f1b0f3b..d5b17c757 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -86,7 +86,7 @@ This option is used to support operations such as `insert`, 
`delete`, and `updat
 
 ### support_upsert_by_query_primary_key_exist [boolean]
 
-Choose to use INSERT sql, UPDATE sql to process update events(INSERT, 
UPDATE_AFTER) based on query primary key exists. This configuration is only 
used when database unsupport upsert syntax.
+Choose to use INSERT sql, UPDATE sql to process update events(INSERT, 
UPDATE_AFTER) based on query primary key exists. This configuration is only 
used when database unsupported upsert syntax.
 **Note**: that this method has low performance
 
 ### connection_check_timeout_sec [int]
@@ -161,6 +161,7 @@ there are some reference value for params above.
 | Doris      | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | /          
                                        | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
 | teradata   | com.teradata.jdbc.TeraDriver                 | 
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test              | /          
                                        | 
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc                   
                            |
 | Redshift   | com.amazon.redshift.jdbc42.Driver            | 
jdbc:redshift://localhost:5439/testdb                              | 
com.amazon.redshift.xa.RedshiftXADataSource        | 
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42          
                            |
+| Vertica    | com.vertica.jdbc.Driver                      | 
jdbc:vertica://localhost:5433                                      | /          
                                        | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
             |
 
 ## Example
 
@@ -238,4 +239,5 @@ sink {
 - [Feature] Support Redshift JDBC 
Sink([#3615](https://github.com/apache/incubator-seatunnel/pull/3615))
 - [Improve] Add config item enable upsert by 
query([#3708](https://github.com/apache/incubator-seatunnel/pull/3708))
 - [Improve] Add database field to sink 
config([#4199](https://github.com/apache/incubator-seatunnel/pull/4199))
+- [Improve] Add Vertica 
connector([#4303](https://github.com/apache/incubator-seatunnel/pull/4303))
 
diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index 610125980..183a97d3b 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -118,6 +118,7 @@ there are some reference value for params above.
 | doris      | com.mysql.cj.jdbc.Driver                            | 
jdbc:mysql://localhost:3306/test                                       | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
 | teradata   | com.teradata.jdbc.TeraDriver                        | 
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test                  | 
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc                   
                            |
 | Redshift   | com.amazon.redshift.jdbc42.Driver                   | 
jdbc:redshift://localhost:5439/testdb                                  | 
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42          
                            |
+| Vertica    | com.vertica.jdbc.Driver                             | 
jdbc:vertica://localhost:5433                                          | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
             |
 
 ## Example
 
@@ -174,4 +175,5 @@ Jdbc {
 - [Feature] Support Doris JDBC Source 
([3586](https://github.com/apache/incubator-seatunnel/pull/3586))
 - [Feature] Support Redshift JDBC 
Sink([#3615](https://github.com/apache/incubator-seatunnel/pull/3615))
 - [BugFix] Fix jdbc connection reset bug 
([3670](https://github.com/apache/incubator-seatunnel/pull/3670))
+- [Improve] Add Vertica 
connector([#4303](https://github.com/apache/incubator-seatunnel/pull/4303))
 
diff --git a/release-note.md b/release-note.md
index 5554c24d8..f2dec47af 100644
--- a/release-note.md
+++ b/release-note.md
@@ -20,6 +20,7 @@
 - [Github] Add Github source connector #4155
 - [CDC] Support export debezium-json format to kafka #4339
 - [RocketMQ] Add RocketMQ source and sink connector #4007
+- [Jdbc] Add vertica connector #4303
 ### Formats
 - [Canal]Support read canal format message #3950
 
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml 
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 0b9733c6e..f92f605a4 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -43,6 +43,7 @@
         <teradata.version>17.20.00.12</teradata.version>
         <redshift.version>2.1.0.9</redshift.version>
         <saphana.version>2.14.7</saphana.version>
+        <vertica.version>12.0.3-0</vertica.version>
     </properties>
 
     <dependencyManagement>
@@ -122,6 +123,12 @@
                 <version>${saphana.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>com.vertica.jdbc</groupId>
+                <artifactId>vertica-jdbc</artifactId>
+                <version>${vertica.version}</version>
+                <scope>provided</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -184,5 +191,10 @@
             <groupId>com.sap.cloud.db.jdbc</groupId>
             <artifactId>ngdbc</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>com.vertica.jdbc</groupId>
+            <artifactId>vertica-jdbc</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
new file mode 100644
index 000000000..282594ccf
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class VerticaDialect implements JdbcDialect {
+    @Override
+    public String dialectName() {
+        return "Vertica";
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new VerticaJdbcRowConverter();
+    }
+
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new VerticaTypeMapper();
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return "\"" + identifier + "\"";
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
+        List<String> nonUniqueKeyFields =
+                Arrays.stream(fieldNames)
+                        .filter(fieldName -> 
!Arrays.asList(uniqueKeyFields).contains(fieldName))
+                        .collect(Collectors.toList());
+        String valuesBinding =
+                Arrays.stream(fieldNames)
+                        .map(fieldName -> ":" + fieldName + " " + 
quoteIdentifier(fieldName))
+                        .collect(Collectors.joining(", "));
+
+        String usingClause = String.format("SELECT %s FROM DUAL", 
valuesBinding);
+        String onConditions =
+                Arrays.stream(uniqueKeyFields)
+                        .map(
+                                fieldName ->
+                                        String.format(
+                                                "TARGET.%s=SOURCE.%s",
+                                                quoteIdentifier(fieldName),
+                                                quoteIdentifier(fieldName)))
+                        .collect(Collectors.joining(" AND "));
+        String updateSetClause =
+                nonUniqueKeyFields.stream()
+                        .map(
+                                fieldName ->
+                                        String.format(
+                                                "TARGET.%s=SOURCE.%s",
+                                                quoteIdentifier(fieldName),
+                                                quoteIdentifier(fieldName)))
+                        .collect(Collectors.joining(", "));
+        String insertFields =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String insertValues =
+                Arrays.stream(fieldNames)
+                        .map(fieldName -> "SOURCE." + 
quoteIdentifier(fieldName))
+                        .collect(Collectors.joining(", "));
+
+        String upsertSQL =
+                String.format(
+                        " MERGE INTO %s.%s TARGET"
+                                + " USING (%s) SOURCE"
+                                + " ON (%s) "
+                                + " WHEN MATCHED THEN"
+                                + " UPDATE SET %s"
+                                + " WHEN NOT MATCHED THEN"
+                                + " INSERT (%s) VALUES (%s)",
+                        database,
+                        tableName,
+                        usingClause,
+                        onConditions,
+                        updateSetClause,
+                        insertFields,
+                        insertValues);
+
+        return Optional.of(upsertSQL);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectFactory.java
new file mode 100644
index 000000000..3a602de94
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/** Factory for {@link VerticaDialect}. */
+@AutoService(JdbcDialectFactory.class)
+public class VerticaDialectFactory implements JdbcDialectFactory {
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:vertica:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new VerticaDialect();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaJdbcRowConverter.java
new file mode 100644
index 000000000..e8b63629b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaJdbcRowConverter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+public class VerticaJdbcRowConverter extends AbstractJdbcRowConverter {
+    @Override
+    public String converterName() {
+        return "Vertica";
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaTypeMapper.java
new file mode 100644
index 000000000..c83c4c4eb
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaTypeMapper.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class VerticaTypeMapper implements JdbcDialectTypeMapper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcDialect.class);
+
+    // ============================data types=====================
+    // refer to :
+    // 
https://www.vertica.com/docs/12.0.x/HTML/Content/Authoring/SQLReferenceManual/DataTypes/SQLDataTypes.htm
+
+    private static final String VERTICA_UNKNOWN = "UNKNOWN";
+    private static final String VERTICA_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    private static final String VERTICA_TINYINT = "TINYINT";
+    private static final String VERTICA_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    private static final String VERTICA_SMALLINT = "SMALLINT";
+    private static final String VERTICA_SMALLINT_UNSIGNED = "SMALLINT 
UNSIGNED";
+    private static final String VERTICA_MEDIUMINT = "MEDIUMINT";
+    private static final String VERTICA_MEDIUMINT_UNSIGNED = "MEDIUMINT 
UNSIGNED";
+    private static final String VERTICA_INT = "INT";
+    private static final String VERTICA_INT_UNSIGNED = "INT UNSIGNED";
+    private static final String VERTICA_INTEGER = "INTEGER";
+    private static final String VERTICA_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    private static final String VERTICA_BIGINT = "BIGINT";
+    private static final String VERTICA_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    private static final String VERTICA_DECIMAL = "DECIMAL";
+    private static final String VERTICA_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String VERTICA_FLOAT = "FLOAT";
+    private static final String VERTICA_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    private static final String VERTICA_DOUBLE = "DOUBLE";
+    private static final String VERTICA_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    private static final String VERTICA_CHAR = "CHAR";
+    private static final String VERTICA_VARCHAR = "VARCHAR";
+    private static final String VERTICA_TINYTEXT = "TINYTEXT";
+    private static final String VERTICA_MEDIUMTEXT = "MEDIUMTEXT";
+    private static final String VERTICA_TEXT = "TEXT";
+    private static final String VERTICA_LONGTEXT = "LONGTEXT";
+    private static final String VERTICA_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    private static final String VERTICA_DATE = "DATE";
+    private static final String VERTICA_DATETIME = "DATETIME";
+    private static final String VERTICA_TIME = "TIME";
+    private static final String VERTICA_TIMESTAMP = "TIMESTAMP";
+    private static final String VERTICA_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    private static final String VERTICA_TINYBLOB = "TINYBLOB";
+    private static final String VERTICA_MEDIUMBLOB = "MEDIUMBLOB";
+    private static final String VERTICA_BLOB = "BLOB";
+    private static final String VERTICA_LONGBLOB = "LONGBLOB";
+    private static final String VERTICA_BINARY = "BINARY";
+    private static final String VERTICA_VARBINARY = "VARBINARY";
+    private static final String VERTICA_GEOMETRY = "GEOMETRY";
+
+    @Override
+    public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int 
colIndex)
+            throws SQLException {
+        String type = metadata.getColumnTypeName(colIndex).toUpperCase();
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+        switch (type) {
+            case VERTICA_BIT:
+                return BasicType.BOOLEAN_TYPE;
+            case VERTICA_TINYINT:
+            case VERTICA_TINYINT_UNSIGNED:
+            case VERTICA_SMALLINT:
+            case VERTICA_SMALLINT_UNSIGNED:
+            case VERTICA_MEDIUMINT:
+            case VERTICA_MEDIUMINT_UNSIGNED:
+            case VERTICA_INT:
+            case VERTICA_INTEGER:
+            case VERTICA_YEAR:
+                return BasicType.INT_TYPE;
+            case VERTICA_INT_UNSIGNED:
+            case VERTICA_INTEGER_UNSIGNED:
+            case VERTICA_BIGINT:
+                return BasicType.LONG_TYPE;
+            case VERTICA_BIGINT_UNSIGNED:
+                return new DecimalType(20, 0);
+            case VERTICA_DECIMAL:
+                if (precision > 38) {
+                    LOG.warn("{} will probably cause value overflow.", 
VERTICA_DECIMAL);
+                    return new DecimalType(38, 18);
+                }
+                return new DecimalType(precision, scale);
+            case VERTICA_DECIMAL_UNSIGNED:
+                return new DecimalType(precision + 1, scale);
+            case VERTICA_FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case VERTICA_FLOAT_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", 
VERTICA_FLOAT_UNSIGNED);
+                return BasicType.FLOAT_TYPE;
+            case VERTICA_DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+            case VERTICA_DOUBLE_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", 
VERTICA_DOUBLE_UNSIGNED);
+                return BasicType.DOUBLE_TYPE;
+            case VERTICA_CHAR:
+            case VERTICA_TINYTEXT:
+            case VERTICA_MEDIUMTEXT:
+            case VERTICA_TEXT:
+            case VERTICA_VARCHAR:
+            case VERTICA_JSON:
+                return BasicType.STRING_TYPE;
+            case VERTICA_LONGTEXT:
+                LOG.warn(
+                        "Type '{}' has a maximum precision of 536870911 in 
Vertica. "
+                                + "Due to limitations in the seatunnel type 
system, "
+                                + "the precision will be set to 2147483647.",
+                        VERTICA_LONGTEXT);
+                return BasicType.STRING_TYPE;
+            case VERTICA_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case VERTICA_TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case VERTICA_DATETIME:
+            case VERTICA_TIMESTAMP:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+
+            case VERTICA_TINYBLOB:
+            case VERTICA_MEDIUMBLOB:
+            case VERTICA_BLOB:
+            case VERTICA_LONGBLOB:
+            case VERTICA_VARBINARY:
+            case VERTICA_BINARY:
+                return PrimitiveByteArrayType.INSTANCE;
+
+                // Doesn't support yet
+            case VERTICA_GEOMETRY:
+            case VERTICA_UNKNOWN:
+            default:
+                final String jdbcColumnName = metadata.getColumnName(colIndex);
+                throw new JdbcConnectorException(
+                        CommonErrorCode.UNSUPPORTED_OPERATION,
+                        String.format(
+                                "Doesn't support Vertica type '%s' on column 
'%s'  yet.",
+                                type, jdbcColumnName));
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
index 0f7eaa138..8c203354b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
@@ -59,6 +59,11 @@
             <artifactId>mssql-jdbc</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.vertica.jdbc</groupId>
+            <artifactId>vertica-jdbc</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcVerticaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcVerticaIT.java
new file mode 100644
index 000000000..df87df516
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcVerticaIT.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class JdbcVerticaIT extends AbstractJdbcIT {
+
+    private static final String VERTICA_IMAGE = "vertica/vertica-ce:latest";
+    private static final String VERTICA_CONTAINER_HOST = "e2e_vertica";
+
+    private static final String VERTICA_DATABASE = "VMart";
+    private static final String VERTICA_SCHEMA = "public";
+    private static final String VERTICA_SOURCE = "e2e_table_source";
+    private static final String VERTICA_SINK = "e2e_table_sink";
+    private static final String VERTICA_USERNAME = "DBADMIN";
+    private static final String VERTICA_PASSWORD = "";
+    private static final int VERTICA_PORT = 5433;
+    private static final String VERTICA_URL = "jdbc:vertica://" + HOST + 
":%s/%s";
+
+    private static final String DRIVER_CLASS = "com.vertica.jdbc.Driver";
+
+    private static final List<String> CONFIG_FILE =
+            Lists.newArrayList("/jdbc_vertica_source_and_sink.conf");
+    private static final String CREATE_SQL =
+            "create table if not exists %s\n"
+                    + "(\n"
+                    + "   id int,\n"
+                    + "   name varchar,\n"
+                    + "   age int\n"
+                    + ");";
+
+    @Override
+    JdbcCase getJdbcCase() {
+        Map<String, String> containerEnv = new HashMap<>();
+        String jdbcUrl = String.format(VERTICA_URL, VERTICA_PORT, 
VERTICA_DATABASE);
+        Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+        String[] fieldNames = testDataSet.getKey();
+
+        String insertSql = insertTable(VERTICA_SCHEMA, VERTICA_SOURCE, 
fieldNames);
+
+        return JdbcCase.builder()
+                .dockerImage(VERTICA_IMAGE)
+                .networkAliases(VERTICA_CONTAINER_HOST)
+                .containerEnv(containerEnv)
+                .driverClass(DRIVER_CLASS)
+                .host(HOST)
+                .port(VERTICA_PORT)
+                .localPort(VERTICA_PORT)
+                .jdbcTemplate(VERTICA_URL)
+                .jdbcUrl(jdbcUrl)
+                .userName(VERTICA_USERNAME)
+                .password(VERTICA_PASSWORD)
+                .database(VERTICA_SCHEMA)
+                .sourceTable(VERTICA_SOURCE)
+                .sinkTable(VERTICA_SINK)
+                .createSql(CREATE_SQL)
+                .configFile(CONFIG_FILE)
+                .insertSql(insertSql)
+                .testData(testDataSet)
+                .build();
+    }
+
+    @Override
+    void compareResult() {}
+
+    @Override
+    String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar";;
+    }
+
+    @Override
+    Pair<String[], List<SeaTunnelRow>> initTestData() {
+        String[] fieldNames = new String[] {"id", "name", "age"};
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row =
+                    new SeaTunnelRow(
+                            new Object[] {
+                                i, // INT
+                                String.format("f1_%s", i), // VARCHAR
+                                i
+                            });
+            rows.add(row);
+        }
+
+        return Pair.of(fieldNames, rows);
+    }
+
+    @Override
+    protected GenericContainer<?> initContainer() {
+        GenericContainer<?> container =
+                new GenericContainer<>(VERTICA_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(VERTICA_CONTAINER_HOST)
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(VERTICA_IMAGE)));
+        container.setPortBindings(
+                Lists.newArrayList(String.format("%s:%s", VERTICA_PORT, 
VERTICA_PORT)));
+
+        return container;
+    }
+
+    @Override
+    public String quoteIdentifier(String field) {
+        return "\"" + field + "\"";
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_vertica_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_vertica_source_and_sink.conf
new file mode 100644
index 000000000..2f1dcde95
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_vertica_source_and_sink.conf
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Jdbc {
+    url = "jdbc:vertica://e2e_vertica:5433"
+    driver = "com.vertica.jdbc.Driver"
+    connection_check_timeout_sec = 1000
+    user = "DBADMIN"
+    password = ""
+    query = """select id, name, age from e2e_table_source"""
+  }
+
+}
+
+sink {
+  Jdbc {
+    url = "jdbc:vertica://e2e_vertica:5433"
+    driver = "com.vertica.jdbc.Driver"
+    connection_check_timeout_sec = 1000
+    user = "DBADMIN"
+    password = ""
+    query = """INSERT INTO e2e_table_sink (id, name, age) VALUES (?, ?, ?);"""
+  }
+}
+

Reply via email to