This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new aa381cbfb4 [Feature][Connector-V2] Support the jdbc connector for
highgo db (#8282)
aa381cbfb4 is described below
commit aa381cbfb49bb78380dd29b0d15bb45b267499d2
Author: 丑西蒙 <[email protected]>
AuthorDate: Mon Dec 16 20:35:49 2024 +0800
[Feature][Connector-V2] Support the jdbc connector for highgo db (#8282)
---
docs/en/connector-v2/sink/Jdbc.md | 3 +-
docs/en/connector-v2/source/Jdbc.md | 1 +
docs/zh/connector-v2/sink/Jdbc.md | 44 +--
seatunnel-connectors-v2/connector-jdbc/pom.xml | 12 +-
.../jdbc/catalog/highgo/HighGoCatalog.java | 32 +++
.../jdbc/catalog/highgo/HighGoCatalogFactory.java | 56 ++++
.../jdbc/internal/dialect/DatabaseIdentifier.java | 1 +
.../dialect/highgo/HighGoDialectFactory.java | 30 ++
.../connector-jdbc-e2e-part-7/pom.xml | 5 +
.../connectors/seatunnel/jdbc/JdbcHighGoIT.java | 309 +++++++++++++++++++++
...jdbc_highgo_source_and_sink_with_full_type.conf | 65 +++++
11 files changed, 534 insertions(+), 24 deletions(-)
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index 9b86a27721..9f2754c7f0 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -226,7 +226,7 @@ In the case of is_exactly_once = "true", Xa transactions
are used. This requires
there are some reference value for params above.
-| datasource | driver |
url |
xa_data_source_class_name | maven
|
+| datasource | driver | url
|
xa_data_source_class_name | maven
|
|-------------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|
| MySQL | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
com.mysql.cj.jdbc.MysqlXADataSource |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
| PostgreSQL | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/postgres |
org.postgresql.xa.PGXADataSource |
https://mvnrepository.com/artifact/org.postgresql/postgresql
|
@@ -249,6 +249,7 @@ there are some reference value for params above.
| xugu | com.xugu.cloudjdbc.Driver |
jdbc:xugu://localhost:5138 | /
|
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar
|
| InterSystems IRIS | com.intersystems.jdbc.IRISDriver |
jdbc:IRIS://localhost:1972/%SYS | /
|
https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar
|
| opengauss | org.opengauss.Driver |
jdbc:opengauss://localhost:5432/postgres | /
|
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
|
+| Highgo | com.highgo.jdbc.Driver |
jdbc:highgo://localhost:5866/highgo | /
|
https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar
|
## Example
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index b8fbed6d50..9a6463a5d6 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -138,6 +138,7 @@ there are some reference value for params above.
| xugu | com.xugu.cloudjdbc.Driver |
jdbc:xugu://localhost:5138 |
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar
|
| InterSystems IRIS | com.intersystems.jdbc.IRISDriver |
jdbc:IRIS://localhost:1972/%SYS |
https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar
|
| opengauss | org.opengauss.Driver |
jdbc:opengauss://localhost:5432/postgres |
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
|
+| Highgo | com.highgo.jdbc.Driver |
jdbc:highgo://localhost:5866/highgo |
https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar
|
## Example
diff --git a/docs/zh/connector-v2/sink/Jdbc.md
b/docs/zh/connector-v2/sink/Jdbc.md
index 4370af2002..73094aad61 100644
--- a/docs/zh/connector-v2/sink/Jdbc.md
+++ b/docs/zh/connector-v2/sink/Jdbc.md
@@ -216,28 +216,28 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
附录参数仅提供参考
-| 数据源 | driver |
url |
xa_data_source_class_name | maven
|
-|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|------------------------------------------------------------------------------------------------------|
-| MySQL | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
com.mysql.cj.jdbc.MysqlXADataSource |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
-| PostgreSQL | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/postgres |
org.postgresql.xa.PGXADataSource |
https://mvnrepository.com/artifact/org.postgresql/postgresql
|
-| DM | dm.jdbc.driver.DmDriver |
jdbc:dm://localhost:5236 |
dm.jdbc.driver.DmdbXADataSource |
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18
|
-| Phoenix | org.apache.phoenix.queryserver.client.Driver |
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /
|
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
|
-| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver |
jdbc:sqlserver://localhost:1433 |
com.microsoft.sqlserver.jdbc.SQLServerXADataSource |
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
|
-| Oracle | oracle.jdbc.OracleDriver |
jdbc:oracle:thin:@localhost:1521/xepdb1 |
oracle.jdbc.xa.OracleXADataSource |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
|
-| sqlite | org.sqlite.JDBC |
jdbc:sqlite:test.db | /
|
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc
|
-| GBase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test | /
|
https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar
|
-| StarRocks | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
-| db2 | com.ibm.db2.jcc.DB2Driver |
jdbc:db2://localhost:50000/testdb |
com.ibm.db2.jcc.DB2XADataSource |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
-| saphana | com.sap.db.jdbc.Driver |
jdbc:sap://localhost:39015 | /
|
https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc
|
-| Doris | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
-| teradata | com.teradata.jdbc.TeraDriver |
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | /
|
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc
|
-| Redshift | com.amazon.redshift.jdbc42.Driver |
jdbc:redshift://localhost:5439/testdb |
com.amazon.redshift.xa.RedshiftXADataSource |
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42
|
-| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver |
jdbc:snowflake://<account_name>.snowflakecomputing.com | /
|
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc
|
-| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 | /
|
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
-| Kingbase | com.kingbase8.Driver |
jdbc:kingbase8://localhost:54321/db_test | /
|
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
|
-| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
|
-| opengauss | org.opengauss.Driver |
jdbc:opengauss://localhost:5432/postgres | /
|
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
|
-
+| 数据源 | driver |
url |
xa_data_source_class_name | maven
|
+|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------|
+| MySQL | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
com.mysql.cj.jdbc.MysqlXADataSource |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
+| PostgreSQL | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/postgres |
org.postgresql.xa.PGXADataSource |
https://mvnrepository.com/artifact/org.postgresql/postgresql
|
+| DM | dm.jdbc.driver.DmDriver |
jdbc:dm://localhost:5236 |
dm.jdbc.driver.DmdbXADataSource |
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18
|
+| Phoenix | org.apache.phoenix.queryserver.client.Driver |
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /
|
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
|
+| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver |
jdbc:sqlserver://localhost:1433 |
com.microsoft.sqlserver.jdbc.SQLServerXADataSource |
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
|
+| Oracle | oracle.jdbc.OracleDriver |
jdbc:oracle:thin:@localhost:1521/xepdb1 |
oracle.jdbc.xa.OracleXADataSource |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
|
+| sqlite | org.sqlite.JDBC |
jdbc:sqlite:test.db | /
|
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc
|
+| GBase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test | /
|
https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar
|
+| StarRocks | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
+| db2 | com.ibm.db2.jcc.DB2Driver |
jdbc:db2://localhost:50000/testdb |
com.ibm.db2.jcc.DB2XADataSource |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
+| saphana | com.sap.db.jdbc.Driver |
jdbc:sap://localhost:39015 | /
|
https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc
|
+| Doris | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
+| teradata | com.teradata.jdbc.TeraDriver |
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | /
|
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc
|
+| Redshift | com.amazon.redshift.jdbc42.Driver |
jdbc:redshift://localhost:5439/testdb |
com.amazon.redshift.xa.RedshiftXADataSource |
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42
|
+| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver |
jdbc:snowflake://<account_name>.snowflakecomputing.com | /
|
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc
|
+| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 | /
|
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
+| Kingbase | com.kingbase8.Driver |
jdbc:kingbase8://localhost:54321/db_test | /
|
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
|
+| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
|
+| opengauss | org.opengauss.Driver |
jdbc:opengauss://localhost:5432/postgres | /
|
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
|
+| Highgo | com.highgo.jdbc.Driver |
jdbc:highgo://localhost:5866/highgo | /
|
https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar
|
## 示例
简单示例
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 1f09fa2168..c584fddb04 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -55,7 +55,7 @@
<tikv.version>3.2.0</tikv.version>
<opengauss.jdbc.version>5.1.0-og</opengauss.jdbc.version>
<mariadb.jdbc.version>3.5.1</mariadb.jdbc.version>
-
+ <highgo.version>6.2.3</highgo.version>
</properties>
<dependencyManagement>
@@ -223,6 +223,12 @@
<version>${mariadb.jdbc.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.highgo</groupId>
+ <artifactId>HgdbJdbc</artifactId>
+ <version>${highgo.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -341,6 +347,10 @@
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.highgo</groupId>
+ <artifactId>HgdbJdbc</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalog.java
new file mode 100644
index 0000000000..b53f9d60a2
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalog.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.highgo;
+
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog;
+
+public class HighGoCatalog extends PostgresCatalog {
+
+ public HighGoCatalog(
+ String catalogName,
+ String username,
+ String pwd,
+ JdbcUrlUtil.UrlInfo urlInfo,
+ String defaultSchema) {
+ super(catalogName, username, pwd, urlInfo, defaultSchema);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalogFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalogFactory.java
new file mode 100644
index 0000000000..2ffa6a21b1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/highgo/HighGoCatalogFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.highgo;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import java.util.Optional;
+
+public class HighGoCatalogFactory implements CatalogFactory {
+
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
+ JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
+ Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
+ if (!defaultDatabase.isPresent()) {
+ throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
+ }
+ return new HighGoCatalog(
+ catalogName,
+ options.get(JdbcCatalogOptions.USERNAME),
+ options.get(JdbcCatalogOptions.PASSWORD),
+ urlInfo,
+ options.get(JdbcCatalogOptions.SCHEMA));
+ }
+
+ public String factoryIdentifier() {
+ return DatabaseIdentifier.HIGHGO;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return JdbcCatalogOptions.BASE_RULE.build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
index 6f442e7672..0f5bf95010 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
@@ -44,4 +44,5 @@ public class DatabaseIdentifier {
public static final String IRIS = "IRIS";
public static final String INCEPTOR = "Inceptor";
public static final String OPENGAUSS = "OpenGauss";
+ public static final String HIGHGO = "Highgo";
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/highgo/HighGoDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/highgo/HighGoDialectFactory.java
new file mode 100644
index 0000000000..6f3376cefa
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/highgo/HighGoDialectFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.highgo;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(JdbcDialectFactory.class)
+public class HighGoDialectFactory extends PostgresDialectFactory {
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:highgo:");
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
index 54929edfa3..d9ec7df954 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
@@ -106,6 +106,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.highgo</groupId>
+ <artifactId>HgdbJdbc</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHighGoIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHighGoIT.java
new file mode 100644
index 0000000000..d3d05fd016
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHighGoIT.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.highgo.HighGoCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+
+public class JdbcHighGoIT extends AbstractJdbcIT {
+ protected static final String HIGHGO_IMAGE = "xuxuclassmate/highgo";
+
+ private static final String HIGHGO_ALIASES = "e2e_highgo";
+ private static final String DRIVER_CLASS = "com.highgo.jdbc.Driver";
+ private static final int HIGHGO_PORT = 5866;
+ private static final String HIGHGO_URL = "jdbc:highgo://" + HOST +
":%s/%s";
+ private static final String USERNAME = "highgo";
+ private static final String PASSWORD = "Highgo@123";
+ private static final String DATABASE = "highgo";
+ private static final String SCHEMA = "public";
+ private static final String SOURCE_TABLE = "highgo_e2e_source_table";
+ private static final String SINK_TABLE = "highgo_e2e_sink_table";
+ private static final String CATALOG_TABLE = "e2e_table_catalog";
+ private static final Integer GEN_ROWS = 100;
+ private static final List<String> CONFIG_FILE =
+
Lists.newArrayList("/jdbc_highgo_source_and_sink_with_full_type.conf");
+
+ private static final String CREATE_SQL =
+ "CREATE TABLE IF NOT EXISTS %s (\n"
+ + " gid SERIAL PRIMARY KEY,\n"
+ + " text_col TEXT,\n"
+ + " varchar_col VARCHAR(255),\n"
+ + " char_col CHAR(10),\n"
+ + " boolean_col bool,\n"
+ + " smallint_col int2,\n"
+ + " integer_col int4,\n"
+ + " bigint_col BIGINT,\n"
+ + " decimal_col DECIMAL(10, 2),\n"
+ + " numeric_col NUMERIC(8, 4),\n"
+ + " real_col float4,\n"
+ + " double_precision_col float8,\n"
+ + " smallserial_col SMALLSERIAL,\n"
+ + " bigserial_col BIGSERIAL,\n"
+ + " date_col DATE,\n"
+ + " timestamp_col TIMESTAMP,\n"
+ + " bpchar_col BPCHAR(10)\n"
+ + ");";
+
+ private static final String[] fieldNames =
+ new String[] {
+ "gid",
+ "text_col",
+ "varchar_col",
+ "char_col",
+ "boolean_col",
+ "smallint_col",
+ "integer_col",
+ "bigint_col",
+ "decimal_col",
+ "numeric_col",
+ "real_col",
+ "double_precision_col",
+ "smallserial_col",
+ "bigserial_col",
+ "date_col",
+ "timestamp_col",
+ "bpchar_col"
+ };
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib &&
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ + driverUrl());
+ Assertions.assertEquals(0, extraCommands.getExitCode(),
extraCommands.getStderr());
+ };
+
+ @Test
+ @Override
+ public void testCatalog() {
+ if (catalog == null) {
+ return;
+ }
+ TablePath sourceTablePath =
+ new TablePath(
+ jdbcCase.getDatabase(), jdbcCase.getSchema(),
jdbcCase.getSourceTable());
+ TablePath targetTablePath =
+ new TablePath(
+ jdbcCase.getCatalogDatabase(),
+ jdbcCase.getCatalogSchema(),
+ jdbcCase.getCatalogTable());
+
+ CatalogTable catalogTable = catalog.getTable(sourceTablePath);
+ catalog.createTable(targetTablePath, catalogTable, false);
+ Assertions.assertTrue(catalog.tableExists(targetTablePath));
+
+ catalog.dropTable(targetTablePath, false);
+ Assertions.assertFalse(catalog.tableExists(targetTablePath));
+ }
+
+ @Test
+ public void testCreateIndex() {
+ String schema = "public";
+ String databaseName = jdbcCase.getDatabase();
+ TablePath sourceTablePath = TablePath.of(databaseName, "public",
"highgo_e2e_source_table");
+ TablePath targetTablePath = TablePath.of(databaseName, "public",
"highgo_e2e_sink_table");
+ HighGoCatalog highGoCatalog = (HighGoCatalog) catalog;
+ CatalogTable catalogTable = highGoCatalog.getTable(sourceTablePath);
+ dropTableWithAssert(highGoCatalog, targetTablePath, true);
+ // not create index
+ createIndexOrNot(highGoCatalog, targetTablePath, catalogTable, false);
+ Assertions.assertFalse(hasIndex(highGoCatalog, targetTablePath));
+
+ dropTableWithAssert(highGoCatalog, targetTablePath, true);
+ // create index
+ createIndexOrNot(highGoCatalog, targetTablePath, catalogTable, true);
+ Assertions.assertTrue(hasIndex(highGoCatalog, targetTablePath));
+
+ dropTableWithAssert(highGoCatalog, targetTablePath, true);
+ }
+
+ protected boolean hasIndex(Catalog catalog, TablePath targetTablePath) {
+ TableSchema tableSchema =
catalog.getTable(targetTablePath).getTableSchema();
+ PrimaryKey primaryKey = tableSchema.getPrimaryKey();
+ List<ConstraintKey> constraintKeys = tableSchema.getConstraintKeys();
+ if (primaryKey != null &&
StringUtils.isNotBlank(primaryKey.getPrimaryKey())) {
+ return true;
+ }
+ if (!constraintKeys.isEmpty()) {
+ return true;
+ }
+ return false;
+ }
+
+ private void dropTableWithAssert(
+ HighGoCatalog highGoCatalog, TablePath targetTablePath, boolean
ignoreIfNotExists) {
+ highGoCatalog.dropTable(targetTablePath, ignoreIfNotExists);
+ Assertions.assertFalse(highGoCatalog.tableExists(targetTablePath));
+ }
+
+ private void createIndexOrNot(
+ HighGoCatalog highGoCatalog,
+ TablePath targetTablePath,
+ CatalogTable catalogTable,
+ boolean createIndex) {
+ highGoCatalog.createTable(targetTablePath, catalogTable, false,
createIndex);
+ Assertions.assertTrue(highGoCatalog.tableExists(targetTablePath));
+ }
+
+ @Override
+ JdbcCase getJdbcCase() {
+ String jdbcUrl = String.format(HIGHGO_URL, HIGHGO_PORT, DATABASE);
+ Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+ String[] fieldNames = testDataSet.getKey();
+
+ String insertSql = insertTable(SCHEMA, SOURCE_TABLE, fieldNames);
+
+ return JdbcCase.builder()
+ .dockerImage(HIGHGO_IMAGE)
+ .networkAliases(HIGHGO_ALIASES)
+ .driverClass(DRIVER_CLASS)
+ .host(HOST)
+ .port(HIGHGO_PORT)
+ .localPort(HIGHGO_PORT)
+ .jdbcTemplate(HIGHGO_URL)
+ .jdbcUrl(jdbcUrl)
+ .userName(USERNAME)
+ .password(PASSWORD)
+ .database(DATABASE)
+ .schema(SCHEMA)
+ .sourceTable(SOURCE_TABLE)
+ .sinkTable(SINK_TABLE)
+ .catalogDatabase(DATABASE)
+ .catalogSchema(SCHEMA)
+ .catalogTable(CATALOG_TABLE)
+ .createSql(CREATE_SQL)
+ .configFile(CONFIG_FILE)
+ .insertSql(insertSql)
+ .testData(testDataSet)
+ .build();
+ }
+
+ @Override
+ String driverUrl() {
+ return
"https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar";
+ }
+
+ @Override
+ protected Class<?> loadDriverClass() {
+ return super.loadDriverClassFromUrl();
+ }
+
+ @Override
+ Pair<String[], List<SeaTunnelRow>> initTestData() {
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ for (Integer i = 0; i < GEN_ROWS; i++) {
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ i,
+ String.valueOf(i),
+ String.valueOf(i),
+ String.valueOf(i),
+ i % 2 == 0,
+ i,
+ i,
+ Long.valueOf(i),
+ BigDecimal.valueOf(i * 10.0),
+ BigDecimal.valueOf(i * 0.01),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.111"),
+ i,
+ Long.valueOf(i),
+ LocalDate.of(2024, 12, 12).atStartOfDay(),
+ LocalDateTime.of(2024, 12, 12, 10, 0),
+ "Testing"
+ });
+ rows.add(row);
+ }
+
+ return Pair.of(fieldNames, rows);
+ }
+
+ @Override
+ public String quoteIdentifier(String field) {
+ return "\"" + field + "\"";
+ }
+
+ @Override
+ protected void clearTable(String database, String schema, String table) {
+ clearTable(schema, table);
+ }
+
+ @Override
+ protected String buildTableInfoWithSchema(String database, String schema,
String table) {
+ return buildTableInfoWithSchema(schema, table);
+ }
+
+ @Override
+ GenericContainer<?> initContainer() {
+ GenericContainer<?> container =
+ new GenericContainer<>(HIGHGO_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HIGHGO_ALIASES)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(HIGHGO_IMAGE)));
+ container.setPortBindings(
+ Lists.newArrayList(String.format("%s:%s", HIGHGO_PORT,
HIGHGO_PORT)));
+
+ return container;
+ }
+
+ @Override
+ protected void initCatalog() {
+ String jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST,
dbServer.getHost());
+ catalog =
+ new HighGoCatalog(
+ DatabaseIdentifier.HIGHGO,
+ jdbcCase.getUserName(),
+ jdbcCase.getPassword(),
+ JdbcUrlUtil.getUrlInfo(jdbcUrl),
+ SCHEMA);
+ catalog.open();
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_highgo_source_and_sink_with_full_type.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_highgo_source_and_sink_with_full_type.conf
new file mode 100644
index 0000000000..f6652ca970
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_highgo_source_and_sink_with_full_type.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ jdbc {
+ url = "jdbc:highgo://e2e_Highgo:5866/highgo"
+ driver = "com.highgo.jdbc.Driver"
+ connection_check_timeout_sec = 100
+ user = "highgo"
+ password = "Highgo@123"
+ query = "select * from public.highgo_e2e_source_table"
+ split.size = 10
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:highgo://e2e_Highgo:5866/highgo"
+ driver = "com.highgo.jdbc.Driver"
+ user = "highgo"
+ password = "Highgo@123"
+ database = "highgo"
+ table = "public.highgo_e2e_sink_table"
+ generate_sink_sql = true
+ }
+}