This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e1be9d7f8a [Feature][Connector-V2][CDC] Support string type shard 
fields. (#5147)
e1be9d7f8a is described below

commit e1be9d7f8a2c705712f9311a732560c1247de6d3
Author: ic4y <[email protected]>
AuthorDate: Wed Jul 26 21:09:35 2023 +0800

    [Feature][Connector-V2][CDC] Support string type shard fields. (#5147)
    
    * [feature][CDC base] Supports string type shard fields
    
    * Delete invalid code
---
 .../splitter/AbstractJdbcSourceChunkSplitter.java} | 317 +++++++++++----------
 .../splitter/JdbcSourceChunkSplitter.java          |  45 +--
 .../connectors/cdc/base/utils/ObjectUtils.java     |   2 +
 .../mysql/source/eumerator/MySqlChunkSplitter.java | 309 +-------------------
 .../source/eumerator/SqlServerChunkSplitter.java   | 304 +-------------------
 5 files changed, 168 insertions(+), 809 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
similarity index 71%
copy from 
seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
copy to 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index ac0b8165db..e956b11170 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -15,21 +15,19 @@
  * limitations under the License.
  */
 
-package 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.eumerator;
+package org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter;
 
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkRange;
-import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.JdbcSourceChunkSplitter;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 import org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils;
-import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerTypeUtils;
-import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerUtils;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import lombok.extern.slf4j.Slf4j;
 
@@ -40,18 +38,19 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 
 import static java.math.BigDecimal.ROUND_CEILING;
 import static 
org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils.doubleCompare;
 
-/** The {@code ChunkSplitter} used to split table into a set of chunks for 
JDBC data source. */
 @Slf4j
-public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
+public abstract class AbstractJdbcSourceChunkSplitter implements 
JdbcSourceChunkSplitter {
 
     private final JdbcSourceConfig sourceConfig;
     private final JdbcDataSourceDialect dialect;
 
-    public SqlServerChunkSplitter(JdbcSourceConfig sourceConfig, 
JdbcDataSourceDialect dialect) {
+    public AbstractJdbcSourceChunkSplitter(
+            JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
         this.sourceConfig = sourceConfig;
         this.dialect = dialect;
     }
@@ -59,7 +58,6 @@ public class SqlServerChunkSplitter implements 
JdbcSourceChunkSplitter {
     @Override
     public Collection<SnapshotSplit> generateSplits(TableId tableId) {
         try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
             log.info("Start splitting table {} into chunks...", tableId);
             long start = System.currentTimeMillis();
 
@@ -100,66 +98,6 @@ public class SqlServerChunkSplitter implements 
JdbcSourceChunkSplitter {
         }
     }
 
-    @Override
-    public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String 
columnName)
-            throws SQLException {
-        return SqlServerUtils.queryMinMax(jdbc, tableId, columnName);
-    }
-
-    @Override
-    public Object queryMin(
-            JdbcConnection jdbc, TableId tableId, String columnName, Object 
excludedLowerBound)
-            throws SQLException {
-        return SqlServerUtils.queryMin(jdbc, tableId, columnName, 
excludedLowerBound);
-    }
-
-    @Override
-    public Object[] sampleDataFromColumn(
-            JdbcConnection jdbc, TableId tableId, String columnName, int 
inverseSamplingRate)
-            throws SQLException {
-        return SqlServerUtils.sampleDataFromColumn(jdbc, tableId, columnName, 
inverseSamplingRate);
-    }
-
-    @Override
-    public Object queryNextChunkMax(
-            JdbcConnection jdbc,
-            TableId tableId,
-            String columnName,
-            int chunkSize,
-            Object includedLowerBound)
-            throws SQLException {
-        return SqlServerUtils.queryNextChunkMax(
-                jdbc, tableId, columnName, chunkSize, includedLowerBound);
-    }
-
-    @Override
-    public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) 
throws SQLException {
-        return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
-    }
-
-    @Override
-    public String buildSplitScanQuery(
-            TableId tableId,
-            SeaTunnelRowType splitKeyType,
-            boolean isFirstSplit,
-            boolean isLastSplit) {
-        return SqlServerUtils.buildSplitScanQuery(tableId, splitKeyType, 
isFirstSplit, isLastSplit);
-    }
-
-    @Override
-    public SeaTunnelDataType<?> fromDbzColumn(Column splitColumn) {
-        return SqlServerTypeUtils.convertFromColumn(splitColumn);
-    }
-
-    // 
--------------------------------------------------------------------------------------------
-    // Utilities
-    // 
--------------------------------------------------------------------------------------------
-
-    /**
-     * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
-     * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
-     * many queries and is not efficient.
-     */
     private List<ChunkRange> splitTableIntoChunks(
             JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
         final String splitColumnName = splitColumn.name();
@@ -191,20 +129,26 @@ public class SqlServerChunkSplitter implements 
JdbcSourceChunkSplitter {
                         tableId, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
             } else {
                 int shardCount = (int) (approximateRowCnt / chunkSize);
+                int inverseSamplingRate = 
sourceConfig.getInverseSamplingRate();
                 if (sourceConfig.getSampleShardingThreshold() < shardCount) {
+                    // It is necessary to ensure that the number of data rows 
sampled by the
+                    // sampling rate is greater than the number of shards.
+                    // Otherwise, if the sampling rate is too low, it may 
result in an insufficient
+                    // number of data rows for the shards, leading to an 
inadequate number of
+                    // shards.
+                    // Therefore, inverseSamplingRate should be less than 
chunkSize
+                    if (inverseSamplingRate > chunkSize) {
+                        log.warn(
+                                "The inverseSamplingRate is {}, which is 
greater than chunkSize {}, so we set inverseSamplingRate to chunkSize",
+                                inverseSamplingRate,
+                                chunkSize);
+                        inverseSamplingRate = chunkSize;
+                    }
                     Object[] sample =
                             sampleDataFromColumn(
-                                    jdbc,
-                                    tableId,
-                                    splitColumnName,
-                                    sourceConfig.getInverseSamplingRate());
-                    // In order to prevent data loss due to the absence of the 
minimum value in the
-                    // sampled data, the minimum value is directly added here.
-                    Object[] newSample = new Object[sample.length + 1];
-                    newSample[0] = min;
-                    System.arraycopy(sample, 0, newSample, 1, sample.length);
+                                    jdbc, tableId, splitColumnName, 
inverseSamplingRate);
                     return efficientShardingThroughSampling(
-                            tableId, newSample, approximateRowCnt, shardCount);
+                            tableId, sample, approximateRowCnt, shardCount);
                 }
                 return splitUnevenlySizedChunks(
                         jdbc, tableId, splitColumnName, min, max, chunkSize);
@@ -214,7 +158,58 @@ public class SqlServerChunkSplitter implements 
JdbcSourceChunkSplitter {
         }
     }
 
-    private List<ChunkRange> efficientShardingThroughSampling(
+    /** Split table into unevenly sized chunks by continuously calculating 
next chunk max value. */
+    protected List<ChunkRange> splitUnevenlySizedChunks(
+            JdbcConnection jdbc,
+            TableId tableId,
+            String splitColumnName,
+            Object min,
+            Object max,
+            int chunkSize)
+            throws SQLException {
+        log.info(
+                "Use unevenly-sized chunks for table {}, the chunk size is 
{}", tableId, chunkSize);
+        final List<ChunkRange> splits = new ArrayList<>();
+        Object chunkStart = null;
+        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, 
max, chunkSize);
+        int count = 0;
+        while (chunkEnd != null && ObjectCompare(chunkEnd, max) <= 0) {
+            // we start from [null, min + chunk_size) and avoid [null, min)
+            splits.add(ChunkRange.of(chunkStart, chunkEnd));
+            // may sleep a while to avoid DDOS on MySQL server
+            maySleep(count++, tableId);
+            chunkStart = chunkEnd;
+            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, 
max, chunkSize);
+        }
+        // add the ending split
+        splits.add(ChunkRange.of(chunkStart, null));
+        return splits;
+    }
+
+    protected Object nextChunkEnd(
+            JdbcConnection jdbc,
+            Object previousChunkEnd,
+            TableId tableId,
+            String splitColumnName,
+            Object max,
+            int chunkSize)
+            throws SQLException {
+        // chunk end might be null when max values are removed
+        Object chunkEnd =
+                queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, 
previousChunkEnd);
+        if (Objects.equals(previousChunkEnd, chunkEnd)) {
+            // we don't allow equal chunk start and end,
+            // should query the next one larger than chunkEnd
+            chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
+        }
+        if (ObjectCompare(chunkEnd, max) >= 0) {
+            return null;
+        } else {
+            return chunkEnd;
+        }
+    }
+
+    protected List<ChunkRange> efficientShardingThroughSampling(
             TableId tableId, Object[] sampleData, long approximateRowCnt, int 
shardCount) {
         log.info(
                 "Use efficient sharding through sampling optimization for 
table {}, the approximate row count is {}, the shardCount is {}",
@@ -224,16 +219,31 @@ public class SqlServerChunkSplitter implements 
JdbcSourceChunkSplitter {
 
         final List<ChunkRange> splits = new ArrayList<>();
 
-        // Calculate the shard boundaries
-        for (int i = 0; i < shardCount; i++) {
-            Object chunkStart = sampleData[(int) ((long) i * sampleData.length 
/ shardCount)];
-            Object chunkEnd =
-                    i < shardCount - 1
-                            ? sampleData[(int) (((long) i + 1) * 
sampleData.length / shardCount)]
-                            : null;
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
+        if (shardCount == 0) {
+            splits.add(ChunkRange.of(null, null));
+            return splits;
         }
 
+        double approxSamplePerShard = (double) sampleData.length / shardCount;
+
+        if (approxSamplePerShard <= 1) {
+
+            splits.add(ChunkRange.of(null, sampleData[0]));
+            for (int i = 0; i < sampleData.length - 1; i++) {
+                splits.add(ChunkRange.of(sampleData[i], sampleData[i + 1]));
+            }
+            splits.add(ChunkRange.of(sampleData[sampleData.length - 1], null));
+        } else {
+            // Calculate the shard boundaries
+            for (int i = 0; i < shardCount; i++) {
+                Object chunkStart = i == 0 ? null : sampleData[(int) (i * 
approxSamplePerShard)];
+                Object chunkEnd =
+                        i < shardCount - 1
+                                ? sampleData[(int) ((i + 1) * 
approxSamplePerShard)]
+                                : null;
+                splits.add(ChunkRange.of(chunkStart, chunkEnd));
+            }
+        }
         return splits;
     }
 
@@ -241,7 +251,7 @@ public class SqlServerChunkSplitter implements 
JdbcSourceChunkSplitter {
      * Split table into evenly sized chunks based on the numeric min and max 
value of split column,
      * and tumble chunks in step size.
      */
-    private List<ChunkRange> splitEvenlySizedChunks(
+    protected List<ChunkRange> splitEvenlySizedChunks(
             TableId tableId,
             Object min,
             Object max,
@@ -262,7 +272,7 @@ public class SqlServerChunkSplitter implements 
JdbcSourceChunkSplitter {
         final List<ChunkRange> splits = new ArrayList<>();
         Object chunkStart = null;
         Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
-        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
+        while (ObjectCompare(chunkEnd, max) <= 0) {
             splits.add(ChunkRange.of(chunkStart, chunkEnd));
             chunkStart = chunkEnd;
             try {
@@ -277,75 +287,10 @@ public class SqlServerChunkSplitter implements 
JdbcSourceChunkSplitter {
         return splits;
     }
 
-    /** Split table into unevenly sized chunks by continuously calculating 
next chunk max value. */
-    private List<ChunkRange> splitUnevenlySizedChunks(
-            JdbcConnection jdbc,
-            TableId tableId,
-            String splitColumnName,
-            Object min,
-            Object max,
-            int chunkSize)
-            throws SQLException {
-        log.info(
-                "Use unevenly-sized chunks for table {}, the chunk size is 
{}", tableId, chunkSize);
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, 
max, chunkSize);
-        int count = 0;
-        while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
-            // we start from [null, min + chunk_size) and avoid [null, min)
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            // may sleep a while to avoid DDOS on MySQL server
-            maySleep(count++, tableId);
-            chunkStart = chunkEnd;
-            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, 
max, chunkSize);
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
-    }
-
-    private Object nextChunkEnd(
-            JdbcConnection jdbc,
-            Object previousChunkEnd,
-            TableId tableId,
-            String splitColumnName,
-            Object max,
-            int chunkSize)
-            throws SQLException {
-        // chunk end might be null when max values are removed
-        Object chunkEnd =
-                queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, 
previousChunkEnd);
-        if (Objects.equals(previousChunkEnd, chunkEnd)) {
-            // we don't allow equal chunk start and end,
-            // should query the next one larger than chunkEnd
-            chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
-        }
-        if (ObjectUtils.compare(chunkEnd, max) >= 0) {
-            return null;
-        } else {
-            return chunkEnd;
-        }
-    }
-
-    private SnapshotSplit createSnapshotSplit(
-            JdbcConnection jdbc,
-            TableId tableId,
-            int chunkId,
-            SeaTunnelRowType splitKeyType,
-            Object chunkStart,
-            Object chunkEnd) {
-        // currently, we only support single split column
-        Object[] splitStart = chunkStart == null ? null : new Object[] 
{chunkStart};
-        Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
-        return new SnapshotSplit(
-                splitId(tableId, chunkId), tableId, splitKeyType, splitStart, 
splitEnd);
-    }
-
     // 
------------------------------------------------------------------------------------------
     /** Returns the distribution factor of the given table. */
     @SuppressWarnings("MagicNumber")
-    private double calculateDistributionFactor(
+    protected double calculateDistributionFactor(
             TableId tableId, Object min, Object max, long approximateRowCnt) {
 
         if (!min.getClass().equals(max.getClass())) {
@@ -372,10 +317,66 @@ public class SqlServerChunkSplitter implements 
JdbcSourceChunkSplitter {
         return distributionFactor;
     }
 
-    private static String splitId(TableId tableId, int chunkId) {
+    protected SnapshotSplit createSnapshotSplit(
+            JdbcConnection jdbc,
+            TableId tableId,
+            int chunkId,
+            SeaTunnelRowType splitKeyType,
+            Object chunkStart,
+            Object chunkEnd) {
+        // currently, we only support single split column
+        Object[] splitStart = chunkStart == null ? null : new Object[] 
{chunkStart};
+        Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
+        return new SnapshotSplit(
+                splitId(tableId, chunkId), tableId, splitKeyType, splitStart, 
splitEnd);
+    }
+
+    protected Column getSplitColumn(
+            JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId 
tableId)
+            throws SQLException {
+        Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
+        if (primaryKey.isPresent()) {
+            List<String> pkColumns = primaryKey.get().getColumnNames();
+
+            Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
+            for (String pkColumn : pkColumns) {
+                Column column = table.columnWithName(pkColumn);
+                if (isEvenlySplitColumn(column)) {
+                    return column;
+                }
+            }
+        }
+
+        List<ConstraintKey> uniqueKeys = dialect.getUniqueKeys(jdbc, tableId);
+        if (!uniqueKeys.isEmpty()) {
+            Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
+            for (ConstraintKey uniqueKey : uniqueKeys) {
+                List<ConstraintKey.ConstraintKeyColumn> uniqueKeyColumns =
+                        uniqueKey.getColumnNames();
+                for (ConstraintKey.ConstraintKeyColumn uniqueKeyColumn : 
uniqueKeyColumns) {
+                    Column column = 
table.columnWithName(uniqueKeyColumn.getColumnName());
+                    if (isEvenlySplitColumn(column)) {
+                        return column;
+                    }
+                }
+            }
+        }
+
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Incremental snapshot for tables requires primary 
key/unique key,"
+                                + " but table %s doesn't have primary key.",
+                        tableId));
+    }
+
+    protected String splitId(TableId tableId, int chunkId) {
         return tableId.toString() + ":" + chunkId;
     }
 
+    protected int ObjectCompare(Object obj1, Object obj2) {
+        return ObjectUtils.compare(obj1, obj2);
+    }
+
     @SuppressWarnings("MagicNumber")
     private static void maySleep(int count, TableId tableId) {
         // every 100 queries to sleep 1s
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
index bbad9d04b1..b271be0d76 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
@@ -17,22 +17,16 @@
 
 package org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter;
 
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
-import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
 
 /** The {@code ChunkSplitter} used to split table into a set of chunks for 
JDBC data source. */
 public interface JdbcSourceChunkSplitter extends ChunkSplitter {
@@ -142,6 +136,7 @@ public interface JdbcSourceChunkSplitter extends 
ChunkSplitter {
             case INT:
             case BIGINT:
             case DECIMAL:
+            case STRING:
                 return true;
             default:
                 return false;
@@ -167,42 +162,4 @@ public interface JdbcSourceChunkSplitter extends 
ChunkSplitter {
                 new String[] {splitColumn.name()},
                 new SeaTunnelDataType[] {fromDbzColumn(splitColumn)});
     }
-
-    default Column getSplitColumn(
-            JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId 
tableId)
-            throws SQLException {
-        Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
-        if (primaryKey.isPresent()) {
-            List<String> pkColumns = primaryKey.get().getColumnNames();
-
-            Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
-            for (String pkColumn : pkColumns) {
-                Column column = table.columnWithName(pkColumn);
-                if (isEvenlySplitColumn(column)) {
-                    return column;
-                }
-            }
-        }
-
-        List<ConstraintKey> uniqueKeys = dialect.getUniqueKeys(jdbc, tableId);
-        if (!uniqueKeys.isEmpty()) {
-            Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
-            for (ConstraintKey uniqueKey : uniqueKeys) {
-                List<ConstraintKey.ConstraintKeyColumn> uniqueKeyColumns =
-                        uniqueKey.getColumnNames();
-                for (ConstraintKey.ConstraintKeyColumn uniqueKeyColumn : 
uniqueKeyColumns) {
-                    Column column = 
table.columnWithName(uniqueKeyColumn.getColumnName());
-                    if (isEvenlySplitColumn(column)) {
-                        return column;
-                    }
-                }
-            }
-        }
-
-        throw new UnsupportedOperationException(
-                String.format(
-                        "Incremental snapshot for tables requires primary 
key/unique key,"
-                                + " but table %s doesn't have primary key.",
-                        tableId));
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/ObjectUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/ObjectUtils.java
index 3c5b669a25..0f703f02c1 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/ObjectUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/ObjectUtils.java
@@ -63,6 +63,8 @@ public class ObjectUtils {
                     ((BigInteger) minuend).subtract((BigInteger) 
subtrahend).toString());
         } else if (minuend instanceof BigDecimal) {
             return ((BigDecimal) minuend).subtract((BigDecimal) subtrahend);
+        } else if (minuend instanceof String) {
+            return BigDecimal.valueOf(Long.MAX_VALUE);
         } else {
             throw new UnsupportedOperationException(
                     String.format(
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
index 04671d28f5..0249889b23 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
@@ -21,86 +21,21 @@ import 
org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkRange;
-import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.JdbcSourceChunkSplitter;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
-import org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils;
+import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlTypeUtils;
 import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
 import io.debezium.relational.TableId;
 
-import java.math.BigDecimal;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-import static java.math.BigDecimal.ROUND_CEILING;
-import static 
org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils.doubleCompare;
 
 /** The {@code ChunkSplitter} used to split table into a set of chunks for 
JDBC data source. */
-public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlChunkSplitter.class);
-
-    private final JdbcSourceConfig sourceConfig;
-    private final JdbcDataSourceDialect dialect;
+public class MySqlChunkSplitter extends AbstractJdbcSourceChunkSplitter {
 
     public MySqlChunkSplitter(JdbcSourceConfig sourceConfig, 
JdbcDataSourceDialect dialect) {
-        this.sourceConfig = sourceConfig;
-        this.dialect = dialect;
-    }
-
-    @Override
-    public Collection<SnapshotSplit> generateSplits(TableId tableId) {
-        try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
-            LOG.info("Start splitting table {} into chunks...", tableId);
-            long start = System.currentTimeMillis();
-
-            Column splitColumn = getSplitColumn(jdbc, dialect, tableId);
-            final List<ChunkRange> chunks;
-            try {
-                chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
-            } catch (SQLException e) {
-                throw new RuntimeException("Failed to split chunks for table " 
+ tableId, e);
-            }
-
-            // convert chunks into splits
-            List<SnapshotSplit> splits = new ArrayList<>();
-            SeaTunnelRowType splitType = getSplitType(splitColumn);
-            for (int i = 0; i < chunks.size(); i++) {
-                ChunkRange chunk = chunks.get(i);
-                SnapshotSplit split =
-                        createSnapshotSplit(
-                                jdbc,
-                                tableId,
-                                i,
-                                splitType,
-                                chunk.getChunkStart(),
-                                chunk.getChunkEnd());
-                splits.add(split);
-            }
-
-            long end = System.currentTimeMillis();
-            LOG.info(
-                    "Split table {} into {} chunks, time cost: {}ms.",
-                    tableId,
-                    splits.size(),
-                    end - start);
-            return splits;
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    String.format("Generate Splits for table %s error", 
tableId), e);
-        }
+        super(sourceConfig, dialect);
     }
 
     @Override
@@ -153,242 +88,4 @@ public class MySqlChunkSplitter implements 
JdbcSourceChunkSplitter {
     public SeaTunnelDataType<?> fromDbzColumn(Column splitColumn) {
         return MySqlTypeUtils.convertFromColumn(splitColumn);
     }
-
-    // 
--------------------------------------------------------------------------------------------
-    // Utilities
-    // 
--------------------------------------------------------------------------------------------
-
-    /**
-     * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
-     * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
-     * many queries and is not efficient.
-     */
-    private List<ChunkRange> splitTableIntoChunks(
-            JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
-        final String splitColumnName = splitColumn.name();
-        final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
-        final Object min = minMax[0];
-        final Object max = minMax[1];
-        if (min == null || max == null || min.equals(max)) {
-            // empty table, or only one row, return full table scan as a chunk
-            return Collections.singletonList(ChunkRange.all());
-        }
-
-        final int chunkSize = sourceConfig.getSplitSize();
-        final double distributionFactorUpper = 
sourceConfig.getDistributionFactorUpper();
-        final double distributionFactorLower = 
sourceConfig.getDistributionFactorLower();
-
-        if (isEvenlySplitColumn(splitColumn)) {
-            long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
-            double distributionFactor =
-                    calculateDistributionFactor(tableId, min, max, 
approximateRowCnt);
-
-            boolean dataIsEvenlyDistributed =
-                    doubleCompare(distributionFactor, distributionFactorLower) 
>= 0
-                            && doubleCompare(distributionFactor, 
distributionFactorUpper) <= 0;
-
-            if (dataIsEvenlyDistributed) {
-                // the minimum dynamic chunk size is at least 1
-                final int dynamicChunkSize = Math.max((int) 
(distributionFactor * chunkSize), 1);
-                return splitEvenlySizedChunks(
-                        tableId, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
-            } else {
-                int shardCount = (int) (approximateRowCnt / chunkSize);
-                if (sourceConfig.getSampleShardingThreshold() < shardCount) {
-                    Object[] sample =
-                            sampleDataFromColumn(
-                                    jdbc,
-                                    tableId,
-                                    splitColumnName,
-                                    sourceConfig.getInverseSamplingRate());
-                    // In order to prevent data loss due to the absence of the 
minimum value in the
-                    // sampled data, the minimum value is directly added here.
-                    Object[] newSample = new Object[sample.length + 1];
-                    newSample[0] = min;
-                    System.arraycopy(sample, 0, newSample, 1, sample.length);
-                    return efficientShardingThroughSampling(
-                            tableId, newSample, approximateRowCnt, shardCount);
-                }
-                return splitUnevenlySizedChunks(
-                        jdbc, tableId, splitColumnName, min, max, chunkSize);
-            }
-        } else {
-            return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, 
min, max, chunkSize);
-        }
-    }
-
-    private List<ChunkRange> efficientShardingThroughSampling(
-            TableId tableId, Object[] sampleData, long approximateRowCnt, int 
shardCount) {
-        LOG.info(
-                "Use efficient sharding through sampling optimization for 
table {}, the approximate row count is {}, the shardCount is {}",
-                tableId,
-                approximateRowCnt,
-                shardCount);
-
-        final List<ChunkRange> splits = new ArrayList<>();
-
-        // Calculate the shard boundaries
-        for (int i = 0; i < shardCount; i++) {
-            Object chunkStart = sampleData[(int) ((long) i * sampleData.length 
/ shardCount)];
-            Object chunkEnd =
-                    i < shardCount - 1
-                            ? sampleData[(int) (((long) i + 1) * 
sampleData.length / shardCount)]
-                            : null;
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-        }
-
-        return splits;
-    }
-
-    /**
-     * Split table into evenly sized chunks based on the numeric min and max 
value of split column,
-     * and tumble chunks in step size.
-     */
-    private List<ChunkRange> splitEvenlySizedChunks(
-            TableId tableId,
-            Object min,
-            Object max,
-            long approximateRowCnt,
-            int chunkSize,
-            int dynamicChunkSize) {
-        LOG.info(
-                "Use evenly-sized chunk optimization for table {}, the 
approximate row count is {}, the chunk size is {}, the dynamic chunk size is 
{}",
-                tableId,
-                approximateRowCnt,
-                chunkSize,
-                dynamicChunkSize);
-        if (approximateRowCnt <= chunkSize) {
-            // there is no more than one chunk, return full table as a chunk
-            return Collections.singletonList(ChunkRange.all());
-        }
-
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
-        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            chunkStart = chunkEnd;
-            try {
-                chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
-            } catch (ArithmeticException e) {
-                // Stop chunk split to avoid dead loop when number overflows.
-                break;
-            }
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
-    }
-
-    /** Split table into unevenly sized chunks by continuously calculating 
next chunk max value. */
-    private List<ChunkRange> splitUnevenlySizedChunks(
-            JdbcConnection jdbc,
-            TableId tableId,
-            String splitColumnName,
-            Object min,
-            Object max,
-            int chunkSize)
-            throws SQLException {
-        LOG.info(
-                "Use unevenly-sized chunks for table {}, the chunk size is 
{}", tableId, chunkSize);
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, 
max, chunkSize);
-        int count = 0;
-        while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
-            // we start from [null, min + chunk_size) and avoid [null, min)
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            // may sleep a while to avoid DDOS on MySQL server
-            maySleep(count++, tableId);
-            chunkStart = chunkEnd;
-            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, 
max, chunkSize);
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
-    }
-
-    private Object nextChunkEnd(
-            JdbcConnection jdbc,
-            Object previousChunkEnd,
-            TableId tableId,
-            String splitColumnName,
-            Object max,
-            int chunkSize)
-            throws SQLException {
-        // chunk end might be null when max values are removed
-        Object chunkEnd =
-                queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, 
previousChunkEnd);
-        if (Objects.equals(previousChunkEnd, chunkEnd)) {
-            // we don't allow equal chunk start and end,
-            // should query the next one larger than chunkEnd
-            chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
-        }
-        if (ObjectUtils.compare(chunkEnd, max) >= 0) {
-            return null;
-        } else {
-            return chunkEnd;
-        }
-    }
-
-    private SnapshotSplit createSnapshotSplit(
-            JdbcConnection jdbc,
-            TableId tableId,
-            int chunkId,
-            SeaTunnelRowType splitKeyType,
-            Object chunkStart,
-            Object chunkEnd) {
-        // currently, we only support single split column
-        Object[] splitStart = chunkStart == null ? null : new Object[] 
{chunkStart};
-        Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
-        return new SnapshotSplit(
-                splitId(tableId, chunkId), tableId, splitKeyType, splitStart, 
splitEnd);
-    }
-
-    // 
------------------------------------------------------------------------------------------
-    /** Returns the distribution factor of the given table. */
-    @SuppressWarnings("MagicNumber")
-    private double calculateDistributionFactor(
-            TableId tableId, Object min, Object max, long approximateRowCnt) {
-
-        if (!min.getClass().equals(max.getClass())) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Unsupported operation type, the MIN value type %s 
is different with MAX value type %s.",
-                            min.getClass().getSimpleName(), 
max.getClass().getSimpleName()));
-        }
-        if (approximateRowCnt == 0) {
-            return Double.MAX_VALUE;
-        }
-        BigDecimal difference = ObjectUtils.minus(max, min);
-        // factor = (max - min + 1) / rowCount
-        final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
-        double distributionFactor =
-                subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, 
ROUND_CEILING).doubleValue();
-        LOG.info(
-                "The distribution factor of table {} is {} according to the 
min split key {}, max split key {} and approximate row count {}",
-                tableId,
-                distributionFactor,
-                min,
-                max,
-                approximateRowCnt);
-        return distributionFactor;
-    }
-
-    private static String splitId(TableId tableId, int chunkId) {
-        return tableId.toString() + ":" + chunkId;
-    }
-
-    @SuppressWarnings("MagicNumber")
-    private static void maySleep(int count, TableId tableId) {
-        // every 100 queries to sleep 1s
-        if (count % 10 == 0) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // nothing to do
-            }
-            LOG.info("JdbcSourceChunkSplitter has split {} chunks for table 
{}", count, tableId);
-        }
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
index ac0b8165db..7efd53dc3f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
@@ -21,10 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkRange;
-import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.JdbcSourceChunkSplitter;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
-import org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils;
+import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerTypeUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerUtils;
 
@@ -33,71 +30,14 @@ import io.debezium.relational.Column;
 import io.debezium.relational.TableId;
 import lombok.extern.slf4j.Slf4j;
 
-import java.math.BigDecimal;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-import static java.math.BigDecimal.ROUND_CEILING;
-import static 
org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils.doubleCompare;
 
 /** The {@code ChunkSplitter} used to split table into a set of chunks for 
JDBC data source. */
 @Slf4j
-public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
-
-    private final JdbcSourceConfig sourceConfig;
-    private final JdbcDataSourceDialect dialect;
+public class SqlServerChunkSplitter extends AbstractJdbcSourceChunkSplitter {
 
     public SqlServerChunkSplitter(JdbcSourceConfig sourceConfig, 
JdbcDataSourceDialect dialect) {
-        this.sourceConfig = sourceConfig;
-        this.dialect = dialect;
-    }
-
-    @Override
-    public Collection<SnapshotSplit> generateSplits(TableId tableId) {
-        try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
-            log.info("Start splitting table {} into chunks...", tableId);
-            long start = System.currentTimeMillis();
-
-            Column splitColumn = getSplitColumn(jdbc, dialect, tableId);
-            final List<ChunkRange> chunks;
-            try {
-                chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
-            } catch (SQLException e) {
-                throw new RuntimeException("Failed to split chunks for table " 
+ tableId, e);
-            }
-
-            // convert chunks into splits
-            List<SnapshotSplit> splits = new ArrayList<>();
-            SeaTunnelRowType splitType = getSplitType(splitColumn);
-            for (int i = 0; i < chunks.size(); i++) {
-                ChunkRange chunk = chunks.get(i);
-                SnapshotSplit split =
-                        createSnapshotSplit(
-                                jdbc,
-                                tableId,
-                                i,
-                                splitType,
-                                chunk.getChunkStart(),
-                                chunk.getChunkEnd());
-                splits.add(split);
-            }
-
-            long end = System.currentTimeMillis();
-            log.info(
-                    "Split table {} into {} chunks, time cost: {}ms.",
-                    tableId,
-                    splits.size(),
-                    end - start);
-            return splits;
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    String.format("Generate Splits for table %s error", 
tableId), e);
-        }
+        super(sourceConfig, dialect);
     }
 
     @Override
@@ -150,242 +90,4 @@ public class SqlServerChunkSplitter implements 
JdbcSourceChunkSplitter {
     public SeaTunnelDataType<?> fromDbzColumn(Column splitColumn) {
         return SqlServerTypeUtils.convertFromColumn(splitColumn);
     }
-
-    // 
--------------------------------------------------------------------------------------------
-    // Utilities
-    // 
--------------------------------------------------------------------------------------------
-
-    /**
-     * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
-     * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
-     * many queries and is not efficient.
-     */
-    private List<ChunkRange> splitTableIntoChunks(
-            JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
-        final String splitColumnName = splitColumn.name();
-        final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
-        final Object min = minMax[0];
-        final Object max = minMax[1];
-        if (min == null || max == null || min.equals(max)) {
-            // empty table, or only one row, return full table scan as a chunk
-            return Collections.singletonList(ChunkRange.all());
-        }
-
-        final int chunkSize = sourceConfig.getSplitSize();
-        final double distributionFactorUpper = 
sourceConfig.getDistributionFactorUpper();
-        final double distributionFactorLower = 
sourceConfig.getDistributionFactorLower();
-
-        if (isEvenlySplitColumn(splitColumn)) {
-            long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
-            double distributionFactor =
-                    calculateDistributionFactor(tableId, min, max, 
approximateRowCnt);
-
-            boolean dataIsEvenlyDistributed =
-                    doubleCompare(distributionFactor, distributionFactorLower) 
>= 0
-                            && doubleCompare(distributionFactor, 
distributionFactorUpper) <= 0;
-
-            if (dataIsEvenlyDistributed) {
-                // the minimum dynamic chunk size is at least 1
-                final int dynamicChunkSize = Math.max((int) 
(distributionFactor * chunkSize), 1);
-                return splitEvenlySizedChunks(
-                        tableId, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
-            } else {
-                int shardCount = (int) (approximateRowCnt / chunkSize);
-                if (sourceConfig.getSampleShardingThreshold() < shardCount) {
-                    Object[] sample =
-                            sampleDataFromColumn(
-                                    jdbc,
-                                    tableId,
-                                    splitColumnName,
-                                    sourceConfig.getInverseSamplingRate());
-                    // In order to prevent data loss due to the absence of the 
minimum value in the
-                    // sampled data, the minimum value is directly added here.
-                    Object[] newSample = new Object[sample.length + 1];
-                    newSample[0] = min;
-                    System.arraycopy(sample, 0, newSample, 1, sample.length);
-                    return efficientShardingThroughSampling(
-                            tableId, newSample, approximateRowCnt, shardCount);
-                }
-                return splitUnevenlySizedChunks(
-                        jdbc, tableId, splitColumnName, min, max, chunkSize);
-            }
-        } else {
-            return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, 
min, max, chunkSize);
-        }
-    }
-
-    private List<ChunkRange> efficientShardingThroughSampling(
-            TableId tableId, Object[] sampleData, long approximateRowCnt, int 
shardCount) {
-        log.info(
-                "Use efficient sharding through sampling optimization for 
table {}, the approximate row count is {}, the shardCount is {}",
-                tableId,
-                approximateRowCnt,
-                shardCount);
-
-        final List<ChunkRange> splits = new ArrayList<>();
-
-        // Calculate the shard boundaries
-        for (int i = 0; i < shardCount; i++) {
-            Object chunkStart = sampleData[(int) ((long) i * sampleData.length 
/ shardCount)];
-            Object chunkEnd =
-                    i < shardCount - 1
-                            ? sampleData[(int) (((long) i + 1) * 
sampleData.length / shardCount)]
-                            : null;
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-        }
-
-        return splits;
-    }
-
-    /**
-     * Split table into evenly sized chunks based on the numeric min and max 
value of split column,
-     * and tumble chunks in step size.
-     */
-    private List<ChunkRange> splitEvenlySizedChunks(
-            TableId tableId,
-            Object min,
-            Object max,
-            long approximateRowCnt,
-            int chunkSize,
-            int dynamicChunkSize) {
-        log.info(
-                "Use evenly-sized chunk optimization for table {}, the 
approximate row count is {}, the chunk size is {}, the dynamic chunk size is 
{}",
-                tableId,
-                approximateRowCnt,
-                chunkSize,
-                dynamicChunkSize);
-        if (approximateRowCnt <= chunkSize) {
-            // there is no more than one chunk, return full table as a chunk
-            return Collections.singletonList(ChunkRange.all());
-        }
-
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
-        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            chunkStart = chunkEnd;
-            try {
-                chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
-            } catch (ArithmeticException e) {
-                // Stop chunk split to avoid dead loop when number overflows.
-                break;
-            }
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
-    }
-
-    /** Split table into unevenly sized chunks by continuously calculating 
next chunk max value. */
-    private List<ChunkRange> splitUnevenlySizedChunks(
-            JdbcConnection jdbc,
-            TableId tableId,
-            String splitColumnName,
-            Object min,
-            Object max,
-            int chunkSize)
-            throws SQLException {
-        log.info(
-                "Use unevenly-sized chunks for table {}, the chunk size is 
{}", tableId, chunkSize);
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, 
max, chunkSize);
-        int count = 0;
-        while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
-            // we start from [null, min + chunk_size) and avoid [null, min)
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            // may sleep a while to avoid DDOS on MySQL server
-            maySleep(count++, tableId);
-            chunkStart = chunkEnd;
-            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, 
max, chunkSize);
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
-    }
-
-    private Object nextChunkEnd(
-            JdbcConnection jdbc,
-            Object previousChunkEnd,
-            TableId tableId,
-            String splitColumnName,
-            Object max,
-            int chunkSize)
-            throws SQLException {
-        // chunk end might be null when max values are removed
-        Object chunkEnd =
-                queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, 
previousChunkEnd);
-        if (Objects.equals(previousChunkEnd, chunkEnd)) {
-            // we don't allow equal chunk start and end,
-            // should query the next one larger than chunkEnd
-            chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
-        }
-        if (ObjectUtils.compare(chunkEnd, max) >= 0) {
-            return null;
-        } else {
-            return chunkEnd;
-        }
-    }
-
-    private SnapshotSplit createSnapshotSplit(
-            JdbcConnection jdbc,
-            TableId tableId,
-            int chunkId,
-            SeaTunnelRowType splitKeyType,
-            Object chunkStart,
-            Object chunkEnd) {
-        // currently, we only support single split column
-        Object[] splitStart = chunkStart == null ? null : new Object[] 
{chunkStart};
-        Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
-        return new SnapshotSplit(
-                splitId(tableId, chunkId), tableId, splitKeyType, splitStart, 
splitEnd);
-    }
-
-    // 
------------------------------------------------------------------------------------------
-    /** Returns the distribution factor of the given table. */
-    @SuppressWarnings("MagicNumber")
-    private double calculateDistributionFactor(
-            TableId tableId, Object min, Object max, long approximateRowCnt) {
-
-        if (!min.getClass().equals(max.getClass())) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Unsupported operation type, the MIN value type %s 
is different with MAX value type %s.",
-                            min.getClass().getSimpleName(), 
max.getClass().getSimpleName()));
-        }
-        if (approximateRowCnt == 0) {
-            return Double.MAX_VALUE;
-        }
-        BigDecimal difference = ObjectUtils.minus(max, min);
-        // factor = (max - min + 1) / rowCount
-        final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
-        double distributionFactor =
-                subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, 
ROUND_CEILING).doubleValue();
-        log.info(
-                "The distribution factor of table {} is {} according to the 
min split key {}, max split key {} and approximate row count {}",
-                tableId,
-                distributionFactor,
-                min,
-                max,
-                approximateRowCnt);
-        return distributionFactor;
-    }
-
-    private static String splitId(TableId tableId, int chunkId) {
-        return tableId.toString() + ":" + chunkId;
-    }
-
-    @SuppressWarnings("MagicNumber")
-    private static void maySleep(int count, TableId tableId) {
-        // every 100 queries to sleep 1s
-        if (count % 10 == 0) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // nothing to do
-            }
-            log.info("JdbcSourceChunkSplitter has split {} chunks for table 
{}", count, tableId);
-        }
-    }
 }

Reply via email to