This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d41af8a6ed [Hotfix][Jdbc] Fix jdbc setFetchSize error (#6005)
d41af8a6ed is described below

commit d41af8a6ed93a82512acb54f17ea703ddd333f5f
Author: hailin0 <[email protected]>
AuthorDate: Mon Dec 18 23:15:24 2023 +0800

    [Hotfix][Jdbc] Fix jdbc setFetchSize error (#6005)
    
    Setting Integer.MIN_VALUE is only supported by MySQL
---
 .../cdc/sqlserver/source/utils/SqlServerUtils.java |  2 +-
 .../jdbc/internal/dialect/JdbcDialect.java         |  6 ++--
 .../jdbc/internal/dialect/mysql/MysqlDialect.java  | 40 ++++++++++++++++++++++
 .../connectors/seatunnel/jdbc/JdbcOracleIT.java    | 16 +++++++++
 4 files changed, 59 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
index d6e58825da..ab0c061d2c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
@@ -165,7 +165,7 @@ public class SqlServerUtils {
                             .createStatement(
                                     ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
 
-            stmt.setFetchSize(Integer.MIN_VALUE);
+            stmt.setFetchSize(1024);
             rs = stmt.executeQuery(sampleQuery);
 
             int count = 0;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index aa7c03ccdb..099d59d5c0 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -314,10 +314,8 @@ public interface JdbcDialect extends Serializable {
                             quoteIdentifier(columnName), 
tableIdentifier(table.getTablePath()));
         }
 
-        try (Statement stmt =
-                connection.createStatement(
-                        ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY)) {
-            stmt.setFetchSize(Integer.MIN_VALUE);
+        try (Statement stmt = connection.createStatement()) {
+            stmt.setFetchSize(1024);
             try (ResultSet rs = stmt.executeQuery(sampleQuery)) {
                 int count = 0;
                 List<Object> results = new ArrayList<>();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index b22a865a6a..f2ce15e31e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -33,8 +33,10 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -119,6 +121,44 @@ public class MysqlDialect implements JdbcDialect {
         return TablePath.of(tablePath, false);
     }
 
+    @Override
+    public Object[] sampleDataFromColumn(
+            Connection connection, JdbcSourceTable table, String columnName, 
int samplingRate)
+            throws SQLException {
+        String sampleQuery;
+        if (StringUtils.isNotBlank(table.getQuery())) {
+            sampleQuery =
+                    String.format(
+                            "SELECT %s FROM (%s) AS T",
+                            quoteIdentifier(columnName), table.getQuery());
+        } else {
+            sampleQuery =
+                    String.format(
+                            "SELECT %s FROM %s",
+                            quoteIdentifier(columnName), 
tableIdentifier(table.getTablePath()));
+        }
+
+        try (Statement stmt =
+                connection.createStatement(
+                        ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY)) {
+            stmt.setFetchSize(Integer.MIN_VALUE);
+            try (ResultSet rs = stmt.executeQuery(sampleQuery)) {
+                int count = 0;
+                List<Object> results = new ArrayList<>();
+
+                while (rs.next()) {
+                    count++;
+                    if (count % samplingRate == 0) {
+                        results.add(rs.getObject(1));
+                    }
+                }
+                Object[] resultsArray = results.toArray();
+                Arrays.sort(resultsArray);
+                return resultsArray;
+            }
+        }
+    }
+
     @Override
     public Long approximateRowCntStatement(Connection connection, 
JdbcSourceTable table)
             throws SQLException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index ca7bda75c2..a1c2c30fe6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -18,12 +18,17 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleURLParser;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.OracleContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -35,6 +40,7 @@ import com.google.common.collect.Lists;
 
 import java.math.BigDecimal;
 import java.sql.Date;
+import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -95,6 +101,16 @@ public class JdbcOracleIT extends AbstractJdbcIT {
                 "XML_TYPE_COL"
             };
 
+    @Test
+    public void testSampleDataFromColumnSuccess() throws SQLException {
+        JdbcDialect dialect = new OracleDialect();
+        JdbcSourceTable table =
+                JdbcSourceTable.builder()
+                        .tablePath(TablePath.of(null, SCHEMA, SOURCE_TABLE))
+                        .build();
+        dialect.sampleDataFromColumn(connection, table, "INTEGER_COL", 1);
+    }
+
     @Override
     JdbcCase getJdbcCase() {
         Map<String, String> containerEnv = new HashMap<>();

Reply via email to