This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7e0c20a488 [Feature][Connector-V2] jdbc saphana source tablepath
support view and synonym (#7670)
7e0c20a488 is described below
commit 7e0c20a488a86e6a0108d05aa8f5688c7aa0ca30
Author: 老王 <[email protected]>
AuthorDate: Wed Sep 18 19:20:01 2024 +0800
[Feature][Connector-V2] jdbc saphana source tablepath support view and
synonym (#7670)
---
.../jdbc/catalog/AbstractJdbcCatalog.java | 36 +++++++
.../jdbc/catalog/saphana/SapHanaCatalog.java | 115 +++++++++++++++++++++
.../connectors/seatunnel/jdbc/JdbcHanaIT.java | 46 ++++++++-
.../jdbc_sap_hana_test_view_and_synonym.conf | 59 +++++++++++
4 files changed, 255 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index e971c13893..260be79042 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -260,6 +260,14 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
throw new UnsupportedOperationException();
}
+ protected String getListViewSql(String databaseName) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected String getListSynonymSql(String databaseName) {
+ throw new UnsupportedOperationException();
+ }
+
protected String getDatabaseWithConditionSql(String databaseName) {
throw CommonError.unsupportedMethod(this.catalogName,
"getDatabaseWithConditionSql");
}
@@ -331,6 +339,34 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
}
}
+ public List<String> listViews(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(this.catalogName,
databaseName);
+ }
+ String dbUrl = getUrlFromDatabaseName(databaseName);
+ try {
+ return queryString(dbUrl, getListViewSql(databaseName),
this::getTableName);
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s",
catalogName), e);
+ }
+ }
+
+ public List<String> listSynonym(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(this.catalogName,
databaseName);
+ }
+ String dbUrl = getUrlFromDatabaseName(databaseName);
+ try {
+ return queryString(dbUrl, getListSynonymSql(databaseName),
this::getTableName);
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s",
catalogName), e);
+ }
+ }
+
@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
String databaseName = tablePath.getDatabaseName();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
index 70b01b397e..fce7e78eeb 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
@@ -21,22 +21,35 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.saphana;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeMapper;
+import org.apache.commons.lang3.StringUtils;
+
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeConverter.appendColumnSizeIfNeed;
@@ -123,6 +136,18 @@ public class SapHanaCatalog extends AbstractJdbcCatalog {
"SELECT TABLE_NAME FROM TABLES WHERE SCHEMA_NAME = '%s'",
databaseName);
}
+ @Override
+ public String getListViewSql(String databaseName) {
+ return String.format(
+ "SELECT VIEW_NAME FROM SYS.VIEWS WHERE SCHEMA_NAME = '%s'",
databaseName);
+ }
+
+ @Override
+ public String getListSynonymSql(String databaseName) {
+ return String.format(
+ "SELECT SYNONYM_NAME FROM SYNONYMS WHERE SCHEMA_NAME = '%s'",
databaseName);
+ }
+
@Override
protected String getTableName(ResultSet rs) throws SQLException {
return rs.getString(1);
@@ -134,6 +159,96 @@ public class SapHanaCatalog extends AbstractJdbcCatalog {
SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(),
tablePath.getTableName());
}
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ try {
+ if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
+ return querySQLResultExists(
+
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
+ getTableWithConditionSql(tablePath))
+ || querySQLResultExists(
+
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
+ String.format(
+
getListViewSql(tablePath.getDatabaseName())
+ + " AND VIEW_NAME = '%s'",
+ tablePath.getTableName()))
+ || querySQLResultExists(
+
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
+ String.format(
+
getListSynonymSql(tablePath.getDatabaseName())
+ + " AND SYNONYM_NAME = '%s'",
+ tablePath.getSchemaAndTableName()));
+ }
+ return querySQLResultExists(
+ this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
+ getTableWithConditionSql(tablePath));
+ } catch (DatabaseNotExistException e) {
+ return false;
+ } catch (SQLException e) {
+ throw new SeaTunnelException("Failed to querySQLResult", e);
+ }
+ }
+
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ String dbUrl;
+ if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
+ dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+ } else {
+ dbUrl = getUrlFromDatabaseName(defaultDatabase);
+ }
+ Connection conn = getConnection(dbUrl);
+ TablePath originalTablePath = tablePath;
+ if
(listSynonym(tablePath.getDatabaseName()).contains(tablePath.getTableName())) {
+ String sql =
+ String.format(
+ "SELECT SYNONYM_NAME, SCHEMA_NAME, OBJECT_NAME,
OBJECT_SCHEMA FROM SYNONYMS WHERE SCHEMA_NAME = '%s' AND SYNONYM_NAME = '%s'
",
+ tablePath.getDatabaseName(),
tablePath.getTableName());
+ try (PreparedStatement statement = conn.prepareStatement(sql);
+ final ResultSet resultSet = statement.executeQuery()) {
+ while (resultSet.next()) {
+ final String refDatabaseName =
resultSet.getString("OBJECT_SCHEMA");
+ final String refTableName =
resultSet.getString("OBJECT_NAME");
+ tablePath = TablePath.of(refDatabaseName, refTableName);
+ }
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed getting SYNONYM %s",
tablePath.getFullName()), e);
+ }
+ }
+ try {
+ DatabaseMetaData metaData = conn.getMetaData();
+ Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData,
tablePath);
+ List<ConstraintKey> constraintKeys = getConstraintKeys(metaData,
tablePath);
+ try (PreparedStatement ps =
conn.prepareStatement(getSelectColumnsSql(tablePath));
+ ResultSet resultSet = ps.executeQuery()) {
+
+ TableSchema.Builder builder = TableSchema.builder();
+ buildColumnsWithErrorCheck(tablePath, resultSet, builder);
+ // add primary key
+ primaryKey.ifPresent(builder::primaryKey);
+ // add constraint key
+ constraintKeys.forEach(builder::constraintKey);
+ TableIdentifier tableIdentifier =
getTableIdentifier(originalTablePath);
+ return CatalogTable.of(
+ tableIdentifier,
+ builder.build(),
+ buildConnectorOptions(tablePath),
+ Collections.emptyList(),
+ "",
+ catalogName);
+ }
+ } catch (SeaTunnelRuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed getting table %s",
tablePath.getFullName()), e);
+ }
+ }
+
@Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java
index d2b0667795..49b4cb1763 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeMapper;
@@ -38,6 +39,7 @@ import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import java.sql.Date;
+import java.sql.Statement;
import java.time.Duration;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
@@ -56,7 +58,9 @@ public class JdbcHanaIT extends AbstractJdbcIT {
private static final String SOURCE_TABLE = "ALLDATATYPES";
private static final List<String> CONFIG_FILE =
- Lists.newArrayList("/jdbc_sap_hana_source_and_sink.conf");
+ Lists.newArrayList(
+ "/jdbc_sap_hana_source_and_sink.conf",
+ "/jdbc_sap_hana_test_view_and_synonym.conf");
// TODO The current Docker image cannot handle the annotated type normally,
// but the corresponding type can be handled normally on the standard
HANA service
@@ -214,6 +218,46 @@ public class JdbcHanaIT extends AbstractJdbcIT {
}
}
+ protected void createNeededTables() {
+ try (Statement statement = connection.createStatement()) {
+ String createTemplate = jdbcCase.getCreateSql();
+
+ String createSource =
+ String.format(
+ createTemplate,
+ buildTableInfoWithSchema(
+ jdbcCase.getDatabase(),
+ jdbcCase.getSchema(),
+ jdbcCase.getSourceTable()));
+ statement.execute(createSource);
+
+ if (!jdbcCase.isUseSaveModeCreateTable()) {
+ if (jdbcCase.getSinkCreateSql() != null) {
+ createTemplate = jdbcCase.getSinkCreateSql();
+ }
+ String createSink =
+ String.format(
+ createTemplate,
+ buildTableInfoWithSchema(
+ jdbcCase.getDatabase(),
+ jdbcCase.getSchema(),
+ jdbcCase.getSinkTable()));
+ statement.execute(createSink);
+ }
+ // create view and synonym
+ String createViewSql =
+ "CREATE VIEW TEST.ALLDATATYPES_VIEW AS SELECT * FROM
TEST.ALLDATATYPES;";
+ String createSynonymSql =
+ "CREATE SYNONYM TEST.ALLDATATYPES_SYNONYM FOR
TEST.ALLDATATYPES;";
+ statement.execute(createViewSql);
+ statement.execute(createSynonymSql);
+ connection.commit();
+ } catch (Exception exception) {
+ log.error(ExceptionUtils.getMessage(exception));
+ throw new
SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception);
+ }
+ }
+
@Override
GenericContainer<?> initContainer() {
GenericContainer<?> container =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/resources/jdbc_sap_hana_test_view_and_synonym.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/resources/jdbc_sap_hana_test_view_and_synonym.conf
new file mode 100644
index 0000000000..aec552c208
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/resources/jdbc_sap_hana_test_view_and_synonym.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ url = "jdbc:sap://e2e_saphana:39017"
+ driver = "com.sap.db.jdbc.Driver"
+ connection_check_timeout_sec = 1000
+ user = "SYSTEM"
+ password = "testPassword123"
+ "table_list"=[
+ {
+ "table_path"="TEST.ALLDATATYPES_VIEW"
+ },
+ {
+ "table_path"="TEST.ALLDATATYPES_SYNONYM"
+ }
+ ]
+ }
+
+}
+
+transform {
+}
+
+sink {
+ Jdbc {
+ url = "jdbc:sap://e2e_saphana:39017"
+ driver = "com.sap.db.jdbc.Driver"
+ connection_check_timeout_sec = 1000
+ user = "SYSTEM"
+ password = "testPassword123"
+ database = "TEST"
+ table = "${table_name}_sink"
+ generate_sink_sql = true
+ schema_save_mode = RECREATE_SCHEMA
+ data_save_mode = DROP_DATA
+ }
+}
+