This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7e0c20a488 [Feature][Connector-V2] jdbc saphana source tablepath 
support view and  synonym (#7670)
7e0c20a488 is described below

commit 7e0c20a488a86e6a0108d05aa8f5688c7aa0ca30
Author: 老王 <[email protected]>
AuthorDate: Wed Sep 18 19:20:01 2024 +0800

    [Feature][Connector-V2] jdbc saphana source tablepath support view and  
synonym (#7670)
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          |  36 +++++++
 .../jdbc/catalog/saphana/SapHanaCatalog.java       | 115 +++++++++++++++++++++
 .../connectors/seatunnel/jdbc/JdbcHanaIT.java      |  46 ++++++++-
 .../jdbc_sap_hana_test_view_and_synonym.conf       |  59 +++++++++++
 4 files changed, 255 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index e971c13893..260be79042 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -260,6 +260,14 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         throw new UnsupportedOperationException();
     }
 
+    protected String getListViewSql(String databaseName) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected String getListSynonymSql(String databaseName) {
+        throw new UnsupportedOperationException();
+    }
+
     protected String getDatabaseWithConditionSql(String databaseName) {
         throw CommonError.unsupportedMethod(this.catalogName, 
"getDatabaseWithConditionSql");
     }
@@ -331,6 +339,34 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         }
     }
 
+    public List<String> listViews(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, 
databaseName);
+        }
+        String dbUrl = getUrlFromDatabaseName(databaseName);
+        try {
+            return queryString(dbUrl, getListViewSql(databaseName), 
this::getTableName);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", 
catalogName), e);
+        }
+    }
+
+    public List<String> listSynonym(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, 
databaseName);
+        }
+        String dbUrl = getUrlFromDatabaseName(databaseName);
+        try {
+            return queryString(dbUrl, getListSynonymSql(databaseName), 
this::getTableName);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", 
catalogName), e);
+        }
+    }
+
     @Override
     public boolean tableExists(TablePath tablePath) throws CatalogException {
         String databaseName = tablePath.getDatabaseName();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
index 70b01b397e..fce7e78eeb 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
@@ -21,22 +21,35 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.saphana;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeMapper;
 
+import org.apache.commons.lang3.StringUtils;
+
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeConverter.appendColumnSizeIfNeed;
 
@@ -123,6 +136,18 @@ public class SapHanaCatalog extends AbstractJdbcCatalog {
                 "SELECT TABLE_NAME FROM TABLES WHERE SCHEMA_NAME = '%s'", 
databaseName);
     }
 
+    @Override
+    public String getListViewSql(String databaseName) {
+        return String.format(
+                "SELECT VIEW_NAME FROM SYS.VIEWS WHERE SCHEMA_NAME = '%s'", 
databaseName);
+    }
+
+    @Override
+    public String getListSynonymSql(String databaseName) {
+        return String.format(
+                "SELECT SYNONYM_NAME FROM SYNONYMS WHERE SCHEMA_NAME = '%s'", 
databaseName);
+    }
+
     @Override
     protected String getTableName(ResultSet rs) throws SQLException {
         return rs.getString(1);
@@ -134,6 +159,96 @@ public class SapHanaCatalog extends AbstractJdbcCatalog {
                 SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(), 
tablePath.getTableName());
     }
 
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        try {
+            if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
+                return querySQLResultExists(
+                                
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
+                                getTableWithConditionSql(tablePath))
+                        || querySQLResultExists(
+                                
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
+                                String.format(
+                                        
getListViewSql(tablePath.getDatabaseName())
+                                                + " AND VIEW_NAME = '%s'",
+                                        tablePath.getTableName()))
+                        || querySQLResultExists(
+                                
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
+                                String.format(
+                                        
getListSynonymSql(tablePath.getDatabaseName())
+                                                + " AND SYNONYM_NAME = '%s'",
+                                        tablePath.getSchemaAndTableName()));
+            }
+            return querySQLResultExists(
+                    this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
+                    getTableWithConditionSql(tablePath));
+        } catch (DatabaseNotExistException e) {
+            return false;
+        } catch (SQLException e) {
+            throw new SeaTunnelException("Failed to querySQLResult", e);
+        }
+    }
+
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+        String dbUrl;
+        if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
+            dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+        } else {
+            dbUrl = getUrlFromDatabaseName(defaultDatabase);
+        }
+        Connection conn = getConnection(dbUrl);
+        TablePath originalTablePath = tablePath;
+        if 
(listSynonym(tablePath.getDatabaseName()).contains(tablePath.getTableName())) {
+            String sql =
+                    String.format(
+                            "SELECT SYNONYM_NAME, SCHEMA_NAME, OBJECT_NAME, 
OBJECT_SCHEMA  FROM SYNONYMS  WHERE SCHEMA_NAME = '%s' AND SYNONYM_NAME = '%s' 
",
+                            tablePath.getDatabaseName(), 
tablePath.getTableName());
+            try (PreparedStatement statement = conn.prepareStatement(sql);
+                    final ResultSet resultSet = statement.executeQuery()) {
+                while (resultSet.next()) {
+                    final String refDatabaseName = 
resultSet.getString("OBJECT_SCHEMA");
+                    final String refTableName = 
resultSet.getString("OBJECT_NAME");
+                    tablePath = TablePath.of(refDatabaseName, refTableName);
+                }
+            } catch (Exception e) {
+                throw new CatalogException(
+                        String.format("Failed getting SYNONYM %s", 
tablePath.getFullName()), e);
+            }
+        }
+        try {
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData, 
tablePath);
+            List<ConstraintKey> constraintKeys = getConstraintKeys(metaData, 
tablePath);
+            try (PreparedStatement ps = 
conn.prepareStatement(getSelectColumnsSql(tablePath));
+                    ResultSet resultSet = ps.executeQuery()) {
+
+                TableSchema.Builder builder = TableSchema.builder();
+                buildColumnsWithErrorCheck(tablePath, resultSet, builder);
+                // add primary key
+                primaryKey.ifPresent(builder::primaryKey);
+                // add constraint key
+                constraintKeys.forEach(builder::constraintKey);
+                TableIdentifier tableIdentifier = 
getTableIdentifier(originalTablePath);
+                return CatalogTable.of(
+                        tableIdentifier,
+                        builder.build(),
+                        buildConnectorOptions(tablePath),
+                        Collections.emptyList(),
+                        "",
+                        catalogName);
+            }
+        } catch (SeaTunnelRuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", 
tablePath.getFullName()), e);
+        }
+    }
+
     @Override
     protected Column buildColumn(ResultSet resultSet) throws SQLException {
         String columnName = resultSet.getString("COLUMN_NAME");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java
index d2b0667795..49b4cb1763 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeMapper;
 
@@ -38,6 +39,7 @@ import com.google.common.collect.Lists;
 import lombok.SneakyThrows;
 
 import java.sql.Date;
+import java.sql.Statement;
 import java.time.Duration;
 import java.time.LocalDate;
 import java.time.temporal.ChronoUnit;
@@ -56,7 +58,9 @@ public class JdbcHanaIT extends AbstractJdbcIT {
     private static final String SOURCE_TABLE = "ALLDATATYPES";
 
     private static final List<String> CONFIG_FILE =
-            Lists.newArrayList("/jdbc_sap_hana_source_and_sink.conf");
+            Lists.newArrayList(
+                    "/jdbc_sap_hana_source_and_sink.conf",
+                    "/jdbc_sap_hana_test_view_and_synonym.conf");
 
     // TODO The current Docker image cannot handle the annotated type normally,
     //  but the corresponding type can be handled normally on the standard 
HANA service
@@ -214,6 +218,46 @@ public class JdbcHanaIT extends AbstractJdbcIT {
         }
     }
 
+    protected void createNeededTables() {
+        try (Statement statement = connection.createStatement()) {
+            String createTemplate = jdbcCase.getCreateSql();
+
+            String createSource =
+                    String.format(
+                            createTemplate,
+                            buildTableInfoWithSchema(
+                                    jdbcCase.getDatabase(),
+                                    jdbcCase.getSchema(),
+                                    jdbcCase.getSourceTable()));
+            statement.execute(createSource);
+
+            if (!jdbcCase.isUseSaveModeCreateTable()) {
+                if (jdbcCase.getSinkCreateSql() != null) {
+                    createTemplate = jdbcCase.getSinkCreateSql();
+                }
+                String createSink =
+                        String.format(
+                                createTemplate,
+                                buildTableInfoWithSchema(
+                                        jdbcCase.getDatabase(),
+                                        jdbcCase.getSchema(),
+                                        jdbcCase.getSinkTable()));
+                statement.execute(createSink);
+            }
+            // create view and synonym
+            String createViewSql =
+                    "CREATE VIEW TEST.ALLDATATYPES_VIEW AS SELECT * FROM 
TEST.ALLDATATYPES;";
+            String createSynonymSql =
+                    "CREATE SYNONYM TEST.ALLDATATYPES_SYNONYM FOR 
TEST.ALLDATATYPES;";
+            statement.execute(createViewSql);
+            statement.execute(createSynonymSql);
+            connection.commit();
+        } catch (Exception exception) {
+            log.error(ExceptionUtils.getMessage(exception));
+            throw new 
SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception);
+        }
+    }
+
     @Override
     GenericContainer<?> initContainer() {
         GenericContainer<?> container =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/resources/jdbc_sap_hana_test_view_and_synonym.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/resources/jdbc_sap_hana_test_view_and_synonym.conf
new file mode 100644
index 0000000000..aec552c208
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/resources/jdbc_sap_hana_test_view_and_synonym.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Jdbc {
+    url = "jdbc:sap://e2e_saphana:39017"
+    driver = "com.sap.db.jdbc.Driver"
+    connection_check_timeout_sec = 1000
+    user = "SYSTEM"
+    password = "testPassword123"
+    "table_list"=[
+            {
+                "table_path"="TEST.ALLDATATYPES_VIEW"
+            },
+            {
+                "table_path"="TEST.ALLDATATYPES_SYNONYM"
+            }
+        ]
+  }
+
+}
+
+transform {
+}
+
+sink {
+  Jdbc {
+    url = "jdbc:sap://e2e_saphana:39017"
+    driver = "com.sap.db.jdbc.Driver"
+    connection_check_timeout_sec = 1000
+    user = "SYSTEM"
+    password = "testPassword123"
+    database = "TEST"
+    table = "${table_name}_sink"
+    generate_sink_sql = true
+    schema_save_mode = RECREATE_SCHEMA
+    data_save_mode = DROP_DATA
+  }
+}
+

Reply via email to