This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d41af8a6ed [Hotfix][Jdbc] Fix jdbc setFetchSize error (#6005)
d41af8a6ed is described below
commit d41af8a6ed93a82512acb54f17ea703ddd333f5f
Author: hailin0 <[email protected]>
AuthorDate: Mon Dec 18 23:15:24 2023 +0800
[Hotfix][Jdbc] Fix jdbc setFetchSize error (#6005)
Setting Integer.MIN_VALUE is only supported by MySQL
---
.../cdc/sqlserver/source/utils/SqlServerUtils.java | 2 +-
.../jdbc/internal/dialect/JdbcDialect.java | 6 ++--
.../jdbc/internal/dialect/mysql/MysqlDialect.java | 40 ++++++++++++++++++++++
.../connectors/seatunnel/jdbc/JdbcOracleIT.java | 16 +++++++++
4 files changed, 59 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
index d6e58825da..ab0c061d2c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
@@ -165,7 +165,7 @@ public class SqlServerUtils {
.createStatement(
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
- stmt.setFetchSize(Integer.MIN_VALUE);
+ stmt.setFetchSize(1024);
rs = stmt.executeQuery(sampleQuery);
int count = 0;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index aa7c03ccdb..099d59d5c0 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -314,10 +314,8 @@ public interface JdbcDialect extends Serializable {
quoteIdentifier(columnName),
tableIdentifier(table.getTablePath()));
}
- try (Statement stmt =
- connection.createStatement(
- ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY)) {
- stmt.setFetchSize(Integer.MIN_VALUE);
+ try (Statement stmt = connection.createStatement()) {
+ stmt.setFetchSize(1024);
try (ResultSet rs = stmt.executeQuery(sampleQuery)) {
int count = 0;
List<Object> results = new ArrayList<>();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index b22a865a6a..f2ce15e31e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -33,8 +33,10 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -119,6 +121,44 @@ public class MysqlDialect implements JdbcDialect {
return TablePath.of(tablePath, false);
}
+ @Override
+ public Object[] sampleDataFromColumn(
+ Connection connection, JdbcSourceTable table, String columnName,
int samplingRate)
+ throws SQLException {
+ String sampleQuery;
+ if (StringUtils.isNotBlank(table.getQuery())) {
+ sampleQuery =
+ String.format(
+ "SELECT %s FROM (%s) AS T",
+ quoteIdentifier(columnName), table.getQuery());
+ } else {
+ sampleQuery =
+ String.format(
+ "SELECT %s FROM %s",
+ quoteIdentifier(columnName),
tableIdentifier(table.getTablePath()));
+ }
+
+ try (Statement stmt =
+ connection.createStatement(
+ ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY)) {
+ stmt.setFetchSize(Integer.MIN_VALUE);
+ try (ResultSet rs = stmt.executeQuery(sampleQuery)) {
+ int count = 0;
+ List<Object> results = new ArrayList<>();
+
+ while (rs.next()) {
+ count++;
+ if (count % samplingRate == 0) {
+ results.add(rs.getObject(1));
+ }
+ }
+ Object[] resultsArray = results.toArray();
+ Arrays.sort(resultsArray);
+ return resultsArray;
+ }
+ }
+ }
+
@Override
public Long approximateRowCntStatement(Connection connection,
JdbcSourceTable table)
throws SQLException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index ca7bda75c2..a1c2c30fe6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -18,12 +18,17 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleURLParser;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -35,6 +40,7 @@ import com.google.common.collect.Lists;
import java.math.BigDecimal;
import java.sql.Date;
+import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -95,6 +101,16 @@ public class JdbcOracleIT extends AbstractJdbcIT {
"XML_TYPE_COL"
};
+ @Test
+ public void testSampleDataFromColumnSuccess() throws SQLException {
+ JdbcDialect dialect = new OracleDialect();
+ JdbcSourceTable table =
+ JdbcSourceTable.builder()
+ .tablePath(TablePath.of(null, SCHEMA, SOURCE_TABLE))
+ .build();
+ dialect.sampleDataFromColumn(connection, table, "INTEGER_COL", 1);
+ }
+
@Override
JdbcCase getJdbcCase() {
Map<String, String> containerEnv = new HashMap<>();