This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e6b4f9872 [Feature][Connector][Jdbc] Add vertica connector. (#4303)
e6b4f9872 is described below
commit e6b4f98721100454c709d55c77c63ff8543268a3
Author: FlechazoW <[email protected]>
AuthorDate: Mon Apr 17 10:53:33 2023 +0800
[Feature][Connector][Jdbc] Add vertica connector. (#4303)
* [Feature][Connector]Jdbc] Add vertica connector.
---
docs/en/connector-v2/sink/Jdbc.md | 4 +-
docs/en/connector-v2/source/Jdbc.md | 2 +
release-note.md | 1 +
seatunnel-connectors-v2/connector-jdbc/pom.xml | 12 ++
.../internal/dialect/vertica/VerticaDialect.java | 109 +++++++++++++
.../dialect/vertica/VerticaDialectFactory.java | 37 +++++
.../dialect/vertica/VerticaJdbcRowConverter.java | 27 ++++
.../dialect/vertica/VerticaTypeMapper.java | 177 +++++++++++++++++++++
.../connector-jdbc-e2e-part-3/pom.xml | 5 +
.../connectors/seatunnel/jdbc/JdbcVerticaIT.java | 138 ++++++++++++++++
.../resources/jdbc_vertica_source_and_sink.conf | 45 ++++++
11 files changed, 556 insertions(+), 1 deletion(-)
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index f2f1b0f3b..d5b17c757 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -86,7 +86,7 @@ This option is used to support operations such as `insert`,
`delete`, and `updat
### support_upsert_by_query_primary_key_exist [boolean]
-Choose to use INSERT sql, UPDATE sql to process update events(INSERT,
UPDATE_AFTER) based on query primary key exists. This configuration is only
used when database unsupport upsert syntax.
+Choose to use INSERT sql, UPDATE sql to process update events(INSERT,
UPDATE_AFTER) based on query primary key exists. This configuration is only
used when database unsupported upsert syntax.
**Note**: that this method has low performance
### connection_check_timeout_sec [int]
@@ -161,6 +161,7 @@ there are some reference value for params above.
| Doris | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
| teradata | com.teradata.jdbc.TeraDriver |
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | /
|
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc
|
| Redshift | com.amazon.redshift.jdbc42.Driver |
jdbc:redshift://localhost:5439/testdb |
com.amazon.redshift.xa.RedshiftXADataSource |
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42
|
+| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 | /
|
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
## Example
@@ -238,4 +239,5 @@ sink {
- [Feature] Support Redshift JDBC
Sink([#3615](https://github.com/apache/incubator-seatunnel/pull/3615))
- [Improve] Add config item enable upsert by
query([#3708](https://github.com/apache/incubator-seatunnel/pull/3708))
- [Improve] Add database field to sink
config([#4199](https://github.com/apache/incubator-seatunnel/pull/4199))
+- [Improve] Add Vertica
connector([#4303](https://github.com/apache/incubator-seatunnel/pull/4303))
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index 610125980..183a97d3b 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -118,6 +118,7 @@ there are some reference value for params above.
| doris | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
| teradata | com.teradata.jdbc.TeraDriver |
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test |
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc
|
| Redshift | com.amazon.redshift.jdbc42.Driver |
jdbc:redshift://localhost:5439/testdb |
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42
|
+| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 |
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
## Example
@@ -174,4 +175,5 @@ Jdbc {
- [Feature] Support Doris JDBC Source
([3586](https://github.com/apache/incubator-seatunnel/pull/3586))
- [Feature] Support Redshift JDBC
Sink([#3615](https://github.com/apache/incubator-seatunnel/pull/3615))
- [BugFix] Fix jdbc connection reset bug
([3670](https://github.com/apache/incubator-seatunnel/pull/3670))
+- [Improve] Add Vertica
connector([#4303](https://github.com/apache/incubator-seatunnel/pull/4303))
diff --git a/release-note.md b/release-note.md
index 5554c24d8..f2dec47af 100644
--- a/release-note.md
+++ b/release-note.md
@@ -20,6 +20,7 @@
- [Github] Add Github source connector #4155
- [CDC] Support export debezium-json format to kafka #4339
- [RocketMQ] Add RocketMQ source and sink connector #4007
+- [Jdbc] Add vertica connector #4303
### Formats
- [Canal]Support read canal format message #3950
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 0b9733c6e..f92f605a4 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -43,6 +43,7 @@
<teradata.version>17.20.00.12</teradata.version>
<redshift.version>2.1.0.9</redshift.version>
<saphana.version>2.14.7</saphana.version>
+ <vertica.version>12.0.3-0</vertica.version>
</properties>
<dependencyManagement>
@@ -122,6 +123,12 @@
<version>${saphana.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.vertica.jdbc</groupId>
+ <artifactId>vertica-jdbc</artifactId>
+ <version>${vertica.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -184,5 +191,10 @@
<groupId>com.sap.cloud.db.jdbc</groupId>
<artifactId>ngdbc</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>com.vertica.jdbc</groupId>
+ <artifactId>vertica-jdbc</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
new file mode 100644
index 000000000..282594ccf
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class VerticaDialect implements JdbcDialect {
+ @Override
+ public String dialectName() {
+ return "Vertica";
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter() {
+ return new VerticaJdbcRowConverter();
+ }
+
+ @Override
+ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+ return new VerticaTypeMapper();
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return "\"" + identifier + "\"";
+ }
+
+ @Override
+ public Optional<String> getUpsertStatement(
+ String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields) {
+ List<String> nonUniqueKeyFields =
+ Arrays.stream(fieldNames)
+ .filter(fieldName ->
!Arrays.asList(uniqueKeyFields).contains(fieldName))
+ .collect(Collectors.toList());
+ String valuesBinding =
+ Arrays.stream(fieldNames)
+ .map(fieldName -> ":" + fieldName + " " +
quoteIdentifier(fieldName))
+ .collect(Collectors.joining(", "));
+
+ String usingClause = String.format("SELECT %s FROM DUAL",
valuesBinding);
+ String onConditions =
+ Arrays.stream(uniqueKeyFields)
+ .map(
+ fieldName ->
+ String.format(
+ "TARGET.%s=SOURCE.%s",
+ quoteIdentifier(fieldName),
+ quoteIdentifier(fieldName)))
+ .collect(Collectors.joining(" AND "));
+ String updateSetClause =
+ nonUniqueKeyFields.stream()
+ .map(
+ fieldName ->
+ String.format(
+ "TARGET.%s=SOURCE.%s",
+ quoteIdentifier(fieldName),
+ quoteIdentifier(fieldName)))
+ .collect(Collectors.joining(", "));
+ String insertFields =
+ Arrays.stream(fieldNames)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ String insertValues =
+ Arrays.stream(fieldNames)
+ .map(fieldName -> "SOURCE." +
quoteIdentifier(fieldName))
+ .collect(Collectors.joining(", "));
+
+ String upsertSQL =
+ String.format(
+ " MERGE INTO %s.%s TARGET"
+ + " USING (%s) SOURCE"
+ + " ON (%s) "
+ + " WHEN MATCHED THEN"
+ + " UPDATE SET %s"
+ + " WHEN NOT MATCHED THEN"
+ + " INSERT (%s) VALUES (%s)",
+ database,
+ tableName,
+ usingClause,
+ onConditions,
+ updateSetClause,
+ insertFields,
+ insertValues);
+
+ return Optional.of(upsertSQL);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectFactory.java
new file mode 100644
index 000000000..3a602de94
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/** Factory for {@link VerticaDialect}. */
+@AutoService(JdbcDialectFactory.class)
+public class VerticaDialectFactory implements JdbcDialectFactory {
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:vertica:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new VerticaDialect();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaJdbcRowConverter.java
new file mode 100644
index 000000000..e8b63629b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaJdbcRowConverter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+public class VerticaJdbcRowConverter extends AbstractJdbcRowConverter {
+ @Override
+ public String converterName() {
+ return "Vertica";
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaTypeMapper.java
new file mode 100644
index 000000000..c83c4c4eb
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaTypeMapper.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class VerticaTypeMapper implements JdbcDialectTypeMapper {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcDialect.class);
+
+ // ============================data types=====================
+ // refer to :
+ //
https://www.vertica.com/docs/12.0.x/HTML/Content/Authoring/SQLReferenceManual/DataTypes/SQLDataTypes.htm
+
+ private static final String VERTICA_UNKNOWN = "UNKNOWN";
+ private static final String VERTICA_BIT = "BIT";
+
+ // -------------------------number----------------------------
+ private static final String VERTICA_TINYINT = "TINYINT";
+ private static final String VERTICA_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+ private static final String VERTICA_SMALLINT = "SMALLINT";
+ private static final String VERTICA_SMALLINT_UNSIGNED = "SMALLINT
UNSIGNED";
+ private static final String VERTICA_MEDIUMINT = "MEDIUMINT";
+ private static final String VERTICA_MEDIUMINT_UNSIGNED = "MEDIUMINT
UNSIGNED";
+ private static final String VERTICA_INT = "INT";
+ private static final String VERTICA_INT_UNSIGNED = "INT UNSIGNED";
+ private static final String VERTICA_INTEGER = "INTEGER";
+ private static final String VERTICA_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+ private static final String VERTICA_BIGINT = "BIGINT";
+ private static final String VERTICA_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+ private static final String VERTICA_DECIMAL = "DECIMAL";
+ private static final String VERTICA_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+ private static final String VERTICA_FLOAT = "FLOAT";
+ private static final String VERTICA_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+ private static final String VERTICA_DOUBLE = "DOUBLE";
+ private static final String VERTICA_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+ // -------------------------string----------------------------
+ private static final String VERTICA_CHAR = "CHAR";
+ private static final String VERTICA_VARCHAR = "VARCHAR";
+ private static final String VERTICA_TINYTEXT = "TINYTEXT";
+ private static final String VERTICA_MEDIUMTEXT = "MEDIUMTEXT";
+ private static final String VERTICA_TEXT = "TEXT";
+ private static final String VERTICA_LONGTEXT = "LONGTEXT";
+ private static final String VERTICA_JSON = "JSON";
+
+ // ------------------------------time-------------------------
+ private static final String VERTICA_DATE = "DATE";
+ private static final String VERTICA_DATETIME = "DATETIME";
+ private static final String VERTICA_TIME = "TIME";
+ private static final String VERTICA_TIMESTAMP = "TIMESTAMP";
+ private static final String VERTICA_YEAR = "YEAR";
+
+ // ------------------------------blob-------------------------
+ private static final String VERTICA_TINYBLOB = "TINYBLOB";
+ private static final String VERTICA_MEDIUMBLOB = "MEDIUMBLOB";
+ private static final String VERTICA_BLOB = "BLOB";
+ private static final String VERTICA_LONGBLOB = "LONGBLOB";
+ private static final String VERTICA_BINARY = "BINARY";
+ private static final String VERTICA_VARBINARY = "VARBINARY";
+ private static final String VERTICA_GEOMETRY = "GEOMETRY";
+
+ @Override
+ public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int
colIndex)
+ throws SQLException {
+ String type = metadata.getColumnTypeName(colIndex).toUpperCase();
+ int precision = metadata.getPrecision(colIndex);
+ int scale = metadata.getScale(colIndex);
+ switch (type) {
+ case VERTICA_BIT:
+ return BasicType.BOOLEAN_TYPE;
+ case VERTICA_TINYINT:
+ case VERTICA_TINYINT_UNSIGNED:
+ case VERTICA_SMALLINT:
+ case VERTICA_SMALLINT_UNSIGNED:
+ case VERTICA_MEDIUMINT:
+ case VERTICA_MEDIUMINT_UNSIGNED:
+ case VERTICA_INT:
+ case VERTICA_INTEGER:
+ case VERTICA_YEAR:
+ return BasicType.INT_TYPE;
+ case VERTICA_INT_UNSIGNED:
+ case VERTICA_INTEGER_UNSIGNED:
+ case VERTICA_BIGINT:
+ return BasicType.LONG_TYPE;
+ case VERTICA_BIGINT_UNSIGNED:
+ return new DecimalType(20, 0);
+ case VERTICA_DECIMAL:
+ if (precision > 38) {
+ LOG.warn("{} will probably cause value overflow.",
VERTICA_DECIMAL);
+ return new DecimalType(38, 18);
+ }
+ return new DecimalType(precision, scale);
+ case VERTICA_DECIMAL_UNSIGNED:
+ return new DecimalType(precision + 1, scale);
+ case VERTICA_FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case VERTICA_FLOAT_UNSIGNED:
+ LOG.warn("{} will probably cause value overflow.",
VERTICA_FLOAT_UNSIGNED);
+ return BasicType.FLOAT_TYPE;
+ case VERTICA_DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+ case VERTICA_DOUBLE_UNSIGNED:
+ LOG.warn("{} will probably cause value overflow.",
VERTICA_DOUBLE_UNSIGNED);
+ return BasicType.DOUBLE_TYPE;
+ case VERTICA_CHAR:
+ case VERTICA_TINYTEXT:
+ case VERTICA_MEDIUMTEXT:
+ case VERTICA_TEXT:
+ case VERTICA_VARCHAR:
+ case VERTICA_JSON:
+ return BasicType.STRING_TYPE;
+ case VERTICA_LONGTEXT:
+ LOG.warn(
+ "Type '{}' has a maximum precision of 536870911 in
Vertica. "
+ + "Due to limitations in the seatunnel type
system, "
+ + "the precision will be set to 2147483647.",
+ VERTICA_LONGTEXT);
+ return BasicType.STRING_TYPE;
+ case VERTICA_DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case VERTICA_TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case VERTICA_DATETIME:
+ case VERTICA_TIMESTAMP:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+
+ case VERTICA_TINYBLOB:
+ case VERTICA_MEDIUMBLOB:
+ case VERTICA_BLOB:
+ case VERTICA_LONGBLOB:
+ case VERTICA_VARBINARY:
+ case VERTICA_BINARY:
+ return PrimitiveByteArrayType.INSTANCE;
+
+ // Doesn't support yet
+ case VERTICA_GEOMETRY:
+ case VERTICA_UNKNOWN:
+ default:
+ final String jdbcColumnName = metadata.getColumnName(colIndex);
+ throw new JdbcConnectorException(
+ CommonErrorCode.UNSUPPORTED_OPERATION,
+ String.format(
+ "Doesn't support Vertica type '%s' on column
'%s' yet.",
+ type, jdbcColumnName));
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
index 0f7eaa138..8c203354b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
@@ -59,6 +59,11 @@
<artifactId>mssql-jdbc</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.vertica.jdbc</groupId>
+ <artifactId>vertica-jdbc</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcVerticaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcVerticaIT.java
new file mode 100644
index 000000000..df87df516
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcVerticaIT.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class JdbcVerticaIT extends AbstractJdbcIT {
+
+ private static final String VERTICA_IMAGE = "vertica/vertica-ce:latest";
+ private static final String VERTICA_CONTAINER_HOST = "e2e_vertica";
+
+ private static final String VERTICA_DATABASE = "VMart";
+ private static final String VERTICA_SCHEMA = "public";
+ private static final String VERTICA_SOURCE = "e2e_table_source";
+ private static final String VERTICA_SINK = "e2e_table_sink";
+ private static final String VERTICA_USERNAME = "DBADMIN";
+ private static final String VERTICA_PASSWORD = "";
+ private static final int VERTICA_PORT = 5433;
+ private static final String VERTICA_URL = "jdbc:vertica://" + HOST +
":%s/%s";
+
+ private static final String DRIVER_CLASS = "com.vertica.jdbc.Driver";
+
+ private static final List<String> CONFIG_FILE =
+ Lists.newArrayList("/jdbc_vertica_source_and_sink.conf");
+ private static final String CREATE_SQL =
+ "create table if not exists %s\n"
+ + "(\n"
+ + " id int,\n"
+ + " name varchar,\n"
+ + " age int\n"
+ + ");";
+
+ @Override
+ JdbcCase getJdbcCase() {
+ Map<String, String> containerEnv = new HashMap<>();
+ String jdbcUrl = String.format(VERTICA_URL, VERTICA_PORT,
VERTICA_DATABASE);
+ Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+ String[] fieldNames = testDataSet.getKey();
+
+ String insertSql = insertTable(VERTICA_SCHEMA, VERTICA_SOURCE,
fieldNames);
+
+ return JdbcCase.builder()
+ .dockerImage(VERTICA_IMAGE)
+ .networkAliases(VERTICA_CONTAINER_HOST)
+ .containerEnv(containerEnv)
+ .driverClass(DRIVER_CLASS)
+ .host(HOST)
+ .port(VERTICA_PORT)
+ .localPort(VERTICA_PORT)
+ .jdbcTemplate(VERTICA_URL)
+ .jdbcUrl(jdbcUrl)
+ .userName(VERTICA_USERNAME)
+ .password(VERTICA_PASSWORD)
+ .database(VERTICA_SCHEMA)
+ .sourceTable(VERTICA_SOURCE)
+ .sinkTable(VERTICA_SINK)
+ .createSql(CREATE_SQL)
+ .configFile(CONFIG_FILE)
+ .insertSql(insertSql)
+ .testData(testDataSet)
+ .build();
+ }
+
+ @Override
+ void compareResult() {}
+
+ @Override
+ String driverUrl() {
+ return
"https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar";
+ }
+
+ @Override
+ Pair<String[], List<SeaTunnelRow>> initTestData() {
+ String[] fieldNames = new String[] {"id", "name", "age"};
+
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ i, // INT
+ String.format("f1_%s", i), // VARCHAR
+ i
+ });
+ rows.add(row);
+ }
+
+ return Pair.of(fieldNames, rows);
+ }
+
+ @Override
+ protected GenericContainer<?> initContainer() {
+ GenericContainer<?> container =
+ new GenericContainer<>(VERTICA_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(VERTICA_CONTAINER_HOST)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(VERTICA_IMAGE)));
+ container.setPortBindings(
+ Lists.newArrayList(String.format("%s:%s", VERTICA_PORT,
VERTICA_PORT)));
+
+ return container;
+ }
+
+ @Override
+ public String quoteIdentifier(String field) {
+ return "\"" + field + "\"";
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_vertica_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_vertica_source_and_sink.conf
new file mode 100644
index 000000000..2f1dcde95
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_vertica_source_and_sink.conf
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ url = "jdbc:vertica://e2e_vertica:5433"
+ driver = "com.vertica.jdbc.Driver"
+ connection_check_timeout_sec = 1000
+ user = "DBADMIN"
+ password = ""
+ query = """select id, name, age from e2e_table_source"""
+ }
+
+}
+
+sink {
+ Jdbc {
+ url = "jdbc:vertica://e2e_vertica:5433"
+ driver = "com.vertica.jdbc.Driver"
+ connection_check_timeout_sec = 1000
+ user = "DBADMIN"
+ password = ""
+ query = """INSERT INTO e2e_table_sink (id, name, age) VALUES (?, ?, ?);"""
+ }
+}
+