This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0b45bc9090 [Fix][connector-jdbc] Fix CatalogUtils
getCatalogTable(Connection, String, ...) losing primary key for query-only
sources (#10093)
0b45bc9090 is described below
commit 0b45bc90900e14be5d3540a123f61cd915a7212f
Author: yzeng1618 <[email protected]>
AuthorDate: Mon Dec 1 15:58:56 2025 +0800
[Fix][connector-jdbc] Fix CatalogUtils getCatalogTable(Connection, String,
...) losing primary key for query-only sources (#10093)
Co-authored-by: zengyi <[email protected]>
---
docs/en/connector-v2/source/Jdbc.md | 2 +
docs/en/connector-v2/source/Mysql.md | 2 +
docs/zh/connector-v2/source/Jdbc.md | 1 +
docs/zh/connector-v2/source/Mysql.md | 1 +
.../seatunnel/jdbc/catalog/utils/CatalogUtils.java | 60 ++++++-
.../seatunnel/jdbc/source/ChunkSplitter.java | 24 ++-
.../jdbc/catalog/utils/CatalogUtilsTest.java | 92 +++++++++++
.../seatunnel/jdbc/JdbcMySqlCreateTableIT.java | 174 +++++++++++++++++++++
.../connectors/seatunnel/jdbc/JdbcHanaIT.java | 15 ++
9 files changed, 365 insertions(+), 6 deletions(-)
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index f4587cd7f3..42f150c654 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -243,6 +243,8 @@ The JDBC Source connector supports parallel reading of data
from tables. SeaTunn
> If the table can not be split(for example, table have no Primary Key or
> Unique Index, and `partition_column` is not set), it will run in single
> concurrency.
>
> Use `table_path` to replace `query` for single table reading. If you need to
> read multiple tables, use `table_list`.
+>
+> When inferring a primary key based on a `query`, the key is inherited from
the underlying table where the first column in the result set is located, and
its strictness for the overall join result set is not guaranteed (for example,
when the query contains joins or reads from multiple tables).
## appendix
diff --git a/docs/en/connector-v2/source/Mysql.md
b/docs/en/connector-v2/source/Mysql.md
index 9b295b3ca7..eec04de707 100644
--- a/docs/en/connector-v2/source/Mysql.md
+++ b/docs/en/connector-v2/source/Mysql.md
@@ -176,6 +176,8 @@ How many splits do we need to split into, only support
positive integer. default
> If the table can not be split(for example, table have no Primary Key or
> Unique Index, and `partition_column` is not set), it will run in single
> concurrency.
>
> Use `table_path` to replace `query` for single table reading. If you need to
> read multiple tables, use `table_list`.
+>
+> When inferring a primary key based on a `query`, the key is inherited from
the underlying table where the first column in the result set is located, and
its strictness for the overall join result set is not guaranteed (for example,
when the query contains joins or reads from multiple tables).
## Task Example
diff --git a/docs/zh/connector-v2/source/Jdbc.md
b/docs/zh/connector-v2/source/Jdbc.md
index 1f3b1acbfe..b62c995454 100644
--- a/docs/zh/connector-v2/source/Jdbc.md
+++ b/docs/zh/connector-v2/source/Jdbc.md
@@ -236,6 +236,7 @@ JDBC 源连接器支持从表中并行读取数据。SeaTunnel 将使用某些
> 如果表无法分割(例如,表没有主键或唯一索引,且未设置 `partition_column`),它将以单并发运行。
>
> 使用 `table_path` 替换 `query` 进行单表读取。如果需要读取多个表,请使用 `table_list`。
+> 当基于 `query` 推断主键时,主键继承自结果集中第一列所在的底层表;如果 `query` 包含多表 JOIN 或同时从多张表读取,该主键对整个
JOIN 结果集的唯一性不作严格保证。
## 附录
diff --git a/docs/zh/connector-v2/source/Mysql.md
b/docs/zh/connector-v2/source/Mysql.md
index c875fa5ecc..c7ee83b436 100644
--- a/docs/zh/connector-v2/source/Mysql.md
+++ b/docs/zh/connector-v2/source/Mysql.md
@@ -177,6 +177,7 @@ JDBC 源连接器支持从表中并行读取数据。SeaTunnel 将使用特定
> 如果表无法拆分(例如,表没有主键或唯一索引,且未设置 `partition_column`),则将以单线程并发方式运行。
>
> 使用 `table_path` 替代 `query` 来进行单表读取。如果需要读取多个表,请使用 `table_list`。
+> 当基于 `query` 推断主键时,主键继承自结果集中第一列所在的底层表;如果 `query` 包含多表 JOIN 或同时从多张表读取,该主键对整个
JOIN 结果集的唯一性不作严格保证。
## 任务示例
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
index 478d68700d..b98d7cb977 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
@@ -50,6 +50,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -333,7 +334,36 @@ public class CatalogUtils {
Connection connection, String sqlQuery, JdbcDialectTypeMapper
typeMapper)
throws SQLException {
try (PreparedStatement ps = connection.prepareStatement(sqlQuery)) {
- return getCatalogTable(ps.getMetaData(), typeMapper, sqlQuery);
+ ResultSetMetaData resultSetMetaData = ps.getMetaData();
+ CatalogTable catalogTable = getCatalogTable(resultSetMetaData,
typeMapper, sqlQuery);
+
+ PrimaryKey primaryKey = extractPrimaryKey(connection,
resultSetMetaData, sqlQuery);
+ if (primaryKey == null) {
+ return catalogTable;
+ }
+
+ Set<String> queryColumns =
+ catalogTable.getTableSchema().getColumns().stream()
+ .map(Column::getName)
+ .collect(Collectors.toSet());
+ if (!queryColumns.containsAll(primaryKey.getColumnNames())) {
+ return catalogTable;
+ }
+
+ TableSchema newSchema =
+ TableSchema.builder()
+
.columns(catalogTable.getTableSchema().getColumns())
+ .primaryKey(primaryKey)
+
.constraintKey(catalogTable.getTableSchema().getConstraintKeys())
+ .build();
+
+ return CatalogTable.of(
+ catalogTable.getTableId(),
+ newSchema,
+ catalogTable.getOptions(),
+ catalogTable.getPartitionKeys(),
+ catalogTable.getComment(),
+ catalogTable.getCatalogName());
}
}
@@ -353,4 +383,32 @@ public class CatalogUtils {
return getCatalogTable(resultSetMetaData, sqlQuery);
}
}
+
+ private static PrimaryKey extractPrimaryKey(
+ Connection connection, ResultSetMetaData resultSetMetaData, String
sqlQuery) {
+ try {
+ String tableName = resultSetMetaData.getTableName(1);
+ if (StringUtils.isBlank(tableName)) {
+ return null;
+ }
+
+ String databaseName = resultSetMetaData.getCatalogName(1);
+ String schemaName = resultSetMetaData.getSchemaName(1);
+ DatabaseMetaData dbMetaData = connection.getMetaData();
+
+ TablePath tablePath =
+ TablePath.of(
+ StringUtils.isBlank(databaseName) ? null :
databaseName,
+ StringUtils.isBlank(schemaName) ? null :
schemaName,
+ tableName);
+
+ return getPrimaryKey(dbMetaData, tablePath).orElse(null);
+ } catch (SQLException e) {
+ log.debug(
+ "Failed to extract primary key from database metadata for
sql: {}",
+ sqlQuery,
+ e);
+ return null;
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
index 4944886d9a..82c1996cd5 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
@@ -194,11 +194,12 @@ public abstract class ChunkSplitter implements
AutoCloseable, Serializable {
columnName = jdbcDialect.quoteIdentifier(columnName);
columnName = jdbcDialect.convertType(columnName,
column.getSourceType());
- if (StringUtils.isNotBlank(table.getQuery())) {
+ String query = normalizeQuery(table.getQuery());
+ if (StringUtils.isNotBlank(query)) {
minQuery =
String.format(
"SELECT MIN(%s) FROM (%s) tmp WHERE %s > ?",
- columnName, table.getQuery(), columnName);
+ columnName, query, columnName);
} else {
minQuery =
String.format(
@@ -232,11 +233,11 @@ public abstract class ChunkSplitter implements
AutoCloseable, Serializable {
columnName = jdbcDialect.quoteIdentifier(columnName);
columnName = jdbcDialect.convertType(columnName,
column.getSourceType());
- if (StringUtils.isNotBlank(table.getQuery())) {
+ String query = normalizeQuery(table.getQuery());
+ if (StringUtils.isNotBlank(query)) {
sqlQuery =
String.format(
- "SELECT MIN(%s), MAX(%s) FROM (%s) tmp",
- columnName, columnName, table.getQuery());
+ "SELECT MIN(%s), MAX(%s) FROM (%s) tmp",
columnName, columnName, query);
} else {
sqlQuery =
String.format(
@@ -260,6 +261,11 @@ public abstract class ChunkSplitter implements
AutoCloseable, Serializable {
}
protected Optional<SeaTunnelRowType> findSplitKey(JdbcSourceTable table) {
+ if (StringUtils.isNotBlank(table.getQuery()) &&
table.getPartitionColumn() == null) {
+ // Keep query-based tables on single split unless user explicitly
sets partition column
+ return Optional.empty();
+ }
+
TableSchema schema = table.getCatalogTable().getTableSchema();
List<Column> columns = schema.getColumns();
Map<String, Column> columnMap =
@@ -350,6 +356,14 @@ public abstract class ChunkSplitter implements
AutoCloseable, Serializable {
}
}
+ private String normalizeQuery(String query) {
+ if (StringUtils.isEmpty(query)) {
+ return query;
+ }
+ // Avoid trailing semicolons/whitespace breaking wrapped subqueries
+ return StringUtils.stripEnd(query, " \t\r\n;");
+ }
+
protected String createSplitId(TablePath tablePath, int index) {
return String.format("%s-%s", tablePath, index);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
index f4e132e734..2ae9df7951 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
@@ -30,10 +31,16 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDiale
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class CatalogUtilsTest {
@Test
@@ -83,4 +90,89 @@ public class CatalogUtilsTest {
});
Assertions.assertEquals("id comment",
tableSchema2.getColumns().get(0).getComment());
}
+
+ @Test
+ void testGetCatalogTableWithPrimaryKeyFromQuery() throws SQLException {
+ Connection connection = mock(Connection.class);
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
+
+ when(connection.prepareStatement("select id, name from test_table"))
+ .thenReturn(preparedStatement);
+ when(preparedStatement.getMetaData()).thenReturn(resultSetMetaData);
+
+ when(resultSetMetaData.getColumnCount()).thenReturn(2);
+ when(resultSetMetaData.getColumnLabel(1)).thenReturn("id");
+ when(resultSetMetaData.getColumnLabel(2)).thenReturn("name");
+ when(resultSetMetaData.getTableName(1)).thenReturn("test_table");
+ when(resultSetMetaData.getCatalogName(1)).thenReturn("test_db");
+ when(resultSetMetaData.getSchemaName(1)).thenReturn(null);
+
when(resultSetMetaData.isNullable(1)).thenReturn(ResultSetMetaData.columnNullable);
+
when(resultSetMetaData.isNullable(2)).thenReturn(ResultSetMetaData.columnNullable);
+
+ when(connection.getMetaData()).thenReturn(new TestDatabaseMetaData());
+
+ JdbcDialectTypeMapper typeMapper =
+ new JdbcDialectTypeMapper() {
+ @Override
+ public Column mappingColumn(BasicTypeDefine typeDefine) {
+ return PhysicalColumn.of(
+ typeDefine.getName(),
+ BasicType.VOID_TYPE,
+ typeDefine.getLength(),
+ typeDefine.isNullable(),
+ null,
+ null);
+ }
+ };
+
+ CatalogTable catalogTable =
+ CatalogUtils.getCatalogTable(
+ connection, "select id, name from test_table",
typeMapper);
+
+ PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey();
+ Assertions.assertNotNull(primaryKey);
+ Assertions.assertEquals("testfdawe_", primaryKey.getPrimaryKey());
+ Assertions.assertEquals(1, primaryKey.getColumnNames().size());
+ Assertions.assertEquals("id", primaryKey.getColumnNames().get(0));
+ }
+
+ @Test
+ void testGetCatalogTableNotApplyPrimaryKeyWhenMissingColumns() throws
SQLException {
+ Connection connection = mock(Connection.class);
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
+
+ when(connection.prepareStatement("select name from test_table"))
+ .thenReturn(preparedStatement);
+ when(preparedStatement.getMetaData()).thenReturn(resultSetMetaData);
+
+ when(resultSetMetaData.getColumnCount()).thenReturn(1);
+ when(resultSetMetaData.getColumnLabel(1)).thenReturn("name");
+ when(resultSetMetaData.getTableName(1)).thenReturn("test_table");
+ when(resultSetMetaData.getCatalogName(1)).thenReturn("test_db");
+ when(resultSetMetaData.getSchemaName(1)).thenReturn(null);
+
when(resultSetMetaData.isNullable(1)).thenReturn(ResultSetMetaData.columnNullable);
+
+ when(connection.getMetaData()).thenReturn(new TestDatabaseMetaData());
+
+ JdbcDialectTypeMapper typeMapper =
+ new JdbcDialectTypeMapper() {
+ @Override
+ public Column mappingColumn(BasicTypeDefine typeDefine) {
+ return PhysicalColumn.of(
+ typeDefine.getName(),
+ BasicType.VOID_TYPE,
+ typeDefine.getLength(),
+ typeDefine.isNullable(),
+ null,
+ null);
+ }
+ };
+
+ CatalogTable catalogTable =
+ CatalogUtils.getCatalogTable(connection, "select name from
test_table", typeMapper);
+
+ Assertions.assertNull(catalogTable.getTableSchema().getPrimaryKey());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
index e7177286ba..5b5ced1a1c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
@@ -27,6 +29,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerCatalog;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
@@ -54,6 +58,8 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Set;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
@Slf4j
@@ -302,6 +308,174 @@ public class JdbcMySqlCreateTableIT extends TestSuiteBase
implements TestResourc
// delete table
}
+ @Test
+ public void testGetCatalogTablePrimaryKeyFromQuery() throws SQLException {
+ try (Connection connection = getJdbcMySqlConnection()) {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute(
+ "CREATE TABLE IF NOT EXISTS mysql_pk_e2e(\n"
+ + "id int NOT NULL PRIMARY KEY,\n"
+ + "name varchar(100) NULL\n"
+ + ");");
+ }
+
+ JdbcDialectTypeMapper typeMapper =
+ new JdbcDialectTypeMapper() {
+ @Override
+ public org.apache.seatunnel.api.table.catalog.Column
mappingColumn(
+
org.apache.seatunnel.api.table.converter.BasicTypeDefine
+ typeDefine) {
+ return
org.apache.seatunnel.api.table.catalog.PhysicalColumn.of(
+ typeDefine.getName(),
+
org.apache.seatunnel.api.table.type.BasicType.VOID_TYPE,
+ typeDefine.getLength(),
+ typeDefine.isNullable(),
+ typeDefine.getScale(),
+ typeDefine.getComment());
+ }
+ };
+
+ CatalogTable catalogTable =
+ CatalogUtils.getCatalogTable(
+ connection,
+ "select id, name from mysql_pk_e2e where id >= 0",
+ typeMapper);
+
+ PrimaryKey primaryKey =
catalogTable.getTableSchema().getPrimaryKey();
+ Assertions.assertNotNull(primaryKey);
+ Assertions.assertTrue(primaryKey.getColumnNames().contains("id"));
+
+ Set<String> columnNames =
+ catalogTable.getTableSchema().getColumns().stream()
+ .map(Column::getName)
+ .collect(Collectors.toSet());
+ Assertions.assertTrue(columnNames.contains("id"));
+ Assertions.assertTrue(columnNames.contains("name"));
+ }
+ }
+
+ @Test
+ public void testGetCatalogTablePrimaryKeyFromGroupByQuery() throws
SQLException {
+ try (Connection connection = getJdbcMySqlConnection()) {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute(
+ "CREATE TABLE IF NOT EXISTS orders_group_by_e2e("
+ + "id INT NOT NULL PRIMARY KEY,"
+ + "order_date DATE,"
+ + "total_amount DECIMAL(10,2)"
+ + ")");
+ statement.execute(
+ "INSERT INTO orders_group_by_e2e(id, order_date,
total_amount) VALUES "
+ + "(1,'2023-01-01',100.00),"
+ + "(2,'2023-01-02',50.00),"
+ + "(3,'2023-02-01',30.00)");
+ }
+
+ JdbcDialectTypeMapper typeMapper =
+ new JdbcDialectTypeMapper() {
+ @Override
+ public org.apache.seatunnel.api.table.catalog.Column
mappingColumn(
+
org.apache.seatunnel.api.table.converter.BasicTypeDefine
+ typeDefine) {
+ return
org.apache.seatunnel.api.table.catalog.PhysicalColumn.of(
+ typeDefine.getName(),
+
org.apache.seatunnel.api.table.type.BasicType.VOID_TYPE,
+ typeDefine.getLength(),
+ typeDefine.isNullable(),
+ typeDefine.getScale(),
+ typeDefine.getComment());
+ }
+ };
+
+ String sql =
+ "SELECT id, COUNT(*) AS order_cnt "
+ + "FROM orders_group_by_e2e "
+ + "WHERE order_date >= '2023-01-01' "
+ + "GROUP BY id";
+
+ CatalogTable catalogTable =
CatalogUtils.getCatalogTable(connection, sql, typeMapper);
+
+ PrimaryKey primaryKey =
catalogTable.getTableSchema().getPrimaryKey();
+ Assertions.assertNotNull(primaryKey);
+ Assertions.assertEquals(1, primaryKey.getColumnNames().size());
+ Assertions.assertEquals("id", primaryKey.getColumnNames().get(0));
+
+ Set<String> columnNames =
+ catalogTable.getTableSchema().getColumns().stream()
+ .map(Column::getName)
+ .collect(Collectors.toSet());
+ Assertions.assertTrue(columnNames.contains("id"));
+ Assertions.assertTrue(columnNames.contains("order_cnt"));
+ }
+ }
+
+ @Test
+ public void testGetCatalogTablePrimaryKeyFromJoinQuery() throws
SQLException {
+ try (Connection connection = getJdbcMySqlConnection()) {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute(
+ "CREATE TABLE IF NOT EXISTS users_join_e2e("
+ + "id INT NOT NULL PRIMARY KEY,"
+ + "user_name VARCHAR(100),"
+ + "city VARCHAR(100)"
+ + ")");
+ statement.execute(
+ "CREATE TABLE IF NOT EXISTS orders_join_e2e("
+ + "order_id INT NOT NULL PRIMARY KEY,"
+ + "user_id INT,"
+ + "order_date DATE,"
+ + "total_amount DECIMAL(10,2)"
+ + ")");
+ statement.execute(
+ "INSERT INTO users_join_e2e(id, user_name, city)
VALUES "
+ + "(1,'user1','Beijing'),"
+ + "(2,'user2','Shanghai')");
+ statement.execute(
+ "INSERT INTO orders_join_e2e(order_id, user_id,
order_date, total_amount) VALUES "
+ + "(100,1,'2023-01-01',100.00)");
+ }
+
+ JdbcDialectTypeMapper typeMapper =
+ new JdbcDialectTypeMapper() {
+ @Override
+ public org.apache.seatunnel.api.table.catalog.Column
mappingColumn(
+
org.apache.seatunnel.api.table.converter.BasicTypeDefine
+ typeDefine) {
+ return
org.apache.seatunnel.api.table.catalog.PhysicalColumn.of(
+ typeDefine.getName(),
+
org.apache.seatunnel.api.table.type.BasicType.VOID_TYPE,
+ typeDefine.getLength(),
+ typeDefine.isNullable(),
+ typeDefine.getScale(),
+ typeDefine.getComment());
+ }
+ };
+
+ String sql =
+ "SELECT o.order_id, u.id, u.user_name, u.city "
+ + "FROM orders_join_e2e o "
+ + "INNER JOIN users_join_e2e u ON o.user_id = u.id
"
+ + "WHERE o.order_date >= '2023-01-01'";
+
+ CatalogTable catalogTable =
CatalogUtils.getCatalogTable(connection, sql, typeMapper);
+
+ PrimaryKey primaryKey =
catalogTable.getTableSchema().getPrimaryKey();
+ // complex join query should still infer primary key from main
table
+ Assertions.assertNotNull(primaryKey);
+ Assertions.assertEquals(1, primaryKey.getColumnNames().size());
+ Assertions.assertEquals("order_id",
primaryKey.getColumnNames().get(0));
+
+ Set<String> columnNames =
+ catalogTable.getTableSchema().getColumns().stream()
+ .map(Column::getName)
+ .collect(Collectors.toSet());
+ Assertions.assertTrue(columnNames.contains("order_id"));
+ Assertions.assertTrue(columnNames.contains("id"));
+ Assertions.assertTrue(columnNames.contains("user_name"));
+ Assertions.assertTrue(columnNames.contains("city"));
+ }
+ }
+
@Override
public void tearDown() throws Exception {
if (sqlserver_container != null) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java
index 8a7cb2efc6..83b38fee2f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java
@@ -284,4 +284,19 @@ public class JdbcHanaIT extends AbstractJdbcIT {
Assertions.assertEquals(1, columnNames.size());
Assertions.assertEquals(25,
catalogTable.getTableSchema().getColumns().size());
}
+
+ @SneakyThrows
+ @Test
+ public void testCatalogWithQuery() {
+ String query =
+ String.format("SELECT * FROM %s",
buildTableInfoWithSchema(DATABASE, SOURCE_TABLE));
+
+ CatalogTable catalogTable =
+ CatalogUtils.getCatalogTable(connection, query, new
SapHanaTypeMapper());
+
+
Assertions.assertNotNull(catalogTable.getTableSchema().getPrimaryKey());
+ Assertions.assertEquals(
+ 1,
catalogTable.getTableSchema().getPrimaryKey().getColumnNames().size());
+ Assertions.assertEquals(25,
catalogTable.getTableSchema().getColumns().size());
+ }
}