loserwang1024 commented on code in PR #3319:
URL: https://github.com/apache/flink-cdc/pull/3319#discussion_r1657064426


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##########
@@ -132,16 +172,237 @@ default boolean isEvenlySplitColumn(Column splitColumn) {
      * @param splitColumn dbz split column.
      * @return flink data type
      */
-    DataType fromDbzColumn(Column splitColumn);
+    protected abstract DataType fromDbzColumn(Column splitColumn);
+
+    /** Returns the distribution factor of the given table. */
+    protected double calculateDistributionFactor(
+            TableId tableId, Object min, Object max, long approximateRowCnt) {
+
+        if (!min.getClass().equals(max.getClass())) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Unsupported operation type, the MIN value type %s 
is different with MAX value type %s.",
+                            min.getClass().getSimpleName(), 
max.getClass().getSimpleName()));
+        }
+        if (approximateRowCnt == 0) {
+            return Double.MAX_VALUE;
+        }
+        BigDecimal difference = ObjectUtils.minus(max, min);
+        // factor = (max - min + 1) / rowCount
+        final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
+        double distributionFactor =
+                subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, 
ROUND_CEILING).doubleValue();
+        LOG.info(
+                "The distribution factor of table {} is {} according to the 
min split key {}, max split key {} and approximate row count {}",
+                tableId,
+                distributionFactor,
+                min,
+                max,
+                approximateRowCnt);
+        return distributionFactor;
+    }
+
+    /**
+     * Get the column which is seen as chunk key.
+     *
+     * @param table table identity.
+     * @param chunkKeyColumn column name which is seen as chunk key, if 
chunkKeyColumn is null, use
+     *     primary key instead. @Column the column which is seen as chunk key.
+     */
+    protected Column getSplitColumn(Table table, @Nullable String 
chunkKeyColumn) {
+        return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn);
+    }
+
+    /** ChunkEnd less than or equal to max. */
+    protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
+        return ObjectUtils.compare(chunkEnd, max) <= 0;
+    }
+
+    /** ChunkEnd greater than or equal to max. */
+    protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
+        return ObjectUtils.compare(chunkEnd, max) >= 0;
+    }
 
     /**
      * convert dbz column to Flink row type.
      *
      * @param splitColumn split column.
      * @return flink row type.
      */
-    default RowType getSplitType(Column splitColumn) {
+    private RowType getSplitType(Column splitColumn) {
         return (RowType)
                 ROW(FIELD(splitColumn.name(), 
fromDbzColumn(splitColumn))).getLogicalType();
     }
+
+    /**
+     * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
+     * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
+     * many queries and is not efficient.
+     */
+    private List<ChunkRange> splitTableIntoChunks(
+            JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
+        final String splitColumnName = splitColumn.name();
+        final Object[] minMax = JdbcChunkUtils.queryMinMax(jdbc, tableId, 
splitColumnName);
+        final Object min = minMax[0];
+        final Object max = minMax[1];
+        if (min == null || max == null || min.equals(max)) {
+            // empty table, or only one row, return full table scan as a chunk
+            return Collections.singletonList(ChunkRange.all());
+        }
+
+        final int chunkSize = sourceConfig.getSplitSize();
+        final double distributionFactorUpper = 
sourceConfig.getDistributionFactorUpper();
+        final double distributionFactorLower = 
sourceConfig.getDistributionFactorLower();
+
+        if (isEvenlySplitColumn(splitColumn)) {
+            long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
+            double distributionFactor =
+                    calculateDistributionFactor(tableId, min, max, 
approximateRowCnt);
+
+            boolean dataIsEvenlyDistributed =
+                    doubleCompare(distributionFactor, distributionFactorLower) 
>= 0
+                            && doubleCompare(distributionFactor, 
distributionFactorUpper) <= 0;
+
+            if (dataIsEvenlyDistributed) {
+                // the minimum dynamic chunk size is at least 1
+                final int dynamicChunkSize = Math.max((int) 
(distributionFactor * chunkSize), 1);
+                return splitEvenlySizedChunks(
+                        tableId, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
+            } else {
+                return splitUnevenlySizedChunks(
+                        jdbc, tableId, splitColumnName, min, max, chunkSize);
+            }
+        } else {
+            return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, 
min, max, chunkSize);
+        }
+    }
+
+    /**
+     * Split table into evenly sized chunks based on the numeric min and max 
value of split column,
+     * and tumble chunks in step size.
+     */
+    private List<ChunkRange> splitEvenlySizedChunks(
+            TableId tableId,
+            Object min,
+            Object max,
+            long approximateRowCnt,
+            int chunkSize,
+            int dynamicChunkSize) {
+        LOG.info(
+                "Use evenly-sized chunk optimization for table {}, the 
approximate row count is {}, the chunk size is {}, the dynamic chunk size is 
{}",
+                tableId,
+                approximateRowCnt,
+                chunkSize,
+                dynamicChunkSize);
+        if (approximateRowCnt <= chunkSize) {
+            // there is no more than one chunk, return full table as a chunk
+            return Collections.singletonList(ChunkRange.all());
+        }
+
+        final List<ChunkRange> splits = new ArrayList<>();
+        Object chunkStart = null;
+        Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
+        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
+            splits.add(ChunkRange.of(chunkStart, chunkEnd));
+            chunkStart = chunkEnd;
+            try {
+                chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
+            } catch (ArithmeticException e) {
+                // Stop chunk split to avoid dead loop when number overflows.
+                break;
+            }
+        }
+        // add the ending split
+        splits.add(ChunkRange.of(chunkStart, null));
+        return splits;
+    }
+
+    /** Split table into unevenly sized chunks by continuously calculating 
next chunk max value. */
+    private List<ChunkRange> splitUnevenlySizedChunks(
+            JdbcConnection jdbc,
+            TableId tableId,
+            String splitColumnName,
+            Object min,
+            Object max,
+            int chunkSize)
+            throws SQLException {
+        LOG.info(
+                "Use unevenly-sized chunks for table {}, the chunk size is 
{}", tableId, chunkSize);
+        final List<ChunkRange> splits = new ArrayList<>();
+        Object chunkStart = null;
+        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, 
max, chunkSize);
+        int count = 0;
+        while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
+            // we start from [null, min + chunk_size) and avoid [null, min)
+            splits.add(ChunkRange.of(chunkStart, chunkEnd));
+            // may sleep a while to avoid DDOS on PostgreSQL server
+            maySleep(count++, tableId);
+            chunkStart = chunkEnd;
+            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, 
max, chunkSize);
+        }
+        // add the ending split
+        splits.add(ChunkRange.of(chunkStart, null));
+        return splits;
+    }
+
+    private Object nextChunkEnd(
+            JdbcConnection jdbc,
+            Object previousChunkEnd,
+            TableId tableId,
+            String splitColumnName,
+            Object max,
+            int chunkSize)
+            throws SQLException {
+        // chunk end might be null when max values are removed
+        Object chunkEnd =
+                queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, 
previousChunkEnd);
+        if (Objects.equals(previousChunkEnd, chunkEnd)) {
+            // we don't allow equal chunk start and end,
+            // should query the next one larger than chunkEnd
+            chunkEnd = JdbcChunkUtils.queryMin(jdbc, tableId, splitColumnName, 
chunkEnd);

Review Comment:
   done it.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##########
@@ -132,16 +172,237 @@ default boolean isEvenlySplitColumn(Column splitColumn) {
      * @param splitColumn dbz split column.
      * @return flink data type
      */
-    DataType fromDbzColumn(Column splitColumn);
+    protected abstract DataType fromDbzColumn(Column splitColumn);
+
+    /** Returns the distribution factor of the given table. */
+    protected double calculateDistributionFactor(
+            TableId tableId, Object min, Object max, long approximateRowCnt) {
+
+        if (!min.getClass().equals(max.getClass())) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Unsupported operation type, the MIN value type %s 
is different with MAX value type %s.",
+                            min.getClass().getSimpleName(), 
max.getClass().getSimpleName()));
+        }
+        if (approximateRowCnt == 0) {
+            return Double.MAX_VALUE;
+        }
+        BigDecimal difference = ObjectUtils.minus(max, min);
+        // factor = (max - min + 1) / rowCount
+        final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
+        double distributionFactor =
+                subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, 
ROUND_CEILING).doubleValue();
+        LOG.info(
+                "The distribution factor of table {} is {} according to the 
min split key {}, max split key {} and approximate row count {}",
+                tableId,
+                distributionFactor,
+                min,
+                max,
+                approximateRowCnt);
+        return distributionFactor;
+    }
+
+    /**
+     * Get the column which is seen as chunk key.
+     *
+     * @param table table identity.
+     * @param chunkKeyColumn column name which is seen as chunk key, if 
chunkKeyColumn is null, use
+     *     primary key instead. @Column the column which is seen as chunk key.
+     */
+    protected Column getSplitColumn(Table table, @Nullable String 
chunkKeyColumn) {
+        return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn);
+    }
+
+    /** ChunkEnd less than or equal to max. */
+    protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
+        return ObjectUtils.compare(chunkEnd, max) <= 0;
+    }
+
+    /** ChunkEnd greater than or equal to max. */
+    protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
+        return ObjectUtils.compare(chunkEnd, max) >= 0;
+    }
 
     /**
      * convert dbz column to Flink row type.
      *
      * @param splitColumn split column.
      * @return flink row type.
      */
-    default RowType getSplitType(Column splitColumn) {
+    private RowType getSplitType(Column splitColumn) {
         return (RowType)
                 ROW(FIELD(splitColumn.name(), 
fromDbzColumn(splitColumn))).getLogicalType();
     }
+
+    /**
+     * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
+     * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
+     * many queries and is not efficient.
+     */
+    private List<ChunkRange> splitTableIntoChunks(
+            JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
+        final String splitColumnName = splitColumn.name();
+        final Object[] minMax = JdbcChunkUtils.queryMinMax(jdbc, tableId, 
splitColumnName);
+        final Object min = minMax[0];
+        final Object max = minMax[1];
+        if (min == null || max == null || min.equals(max)) {
+            // empty table, or only one row, return full table scan as a chunk
+            return Collections.singletonList(ChunkRange.all());
+        }
+
+        final int chunkSize = sourceConfig.getSplitSize();
+        final double distributionFactorUpper = 
sourceConfig.getDistributionFactorUpper();
+        final double distributionFactorLower = 
sourceConfig.getDistributionFactorLower();
+
+        if (isEvenlySplitColumn(splitColumn)) {
+            long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
+            double distributionFactor =
+                    calculateDistributionFactor(tableId, min, max, 
approximateRowCnt);
+
+            boolean dataIsEvenlyDistributed =
+                    doubleCompare(distributionFactor, distributionFactorLower) 
>= 0
+                            && doubleCompare(distributionFactor, 
distributionFactorUpper) <= 0;
+
+            if (dataIsEvenlyDistributed) {
+                // the minimum dynamic chunk size is at least 1
+                final int dynamicChunkSize = Math.max((int) 
(distributionFactor * chunkSize), 1);
+                return splitEvenlySizedChunks(
+                        tableId, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
+            } else {
+                return splitUnevenlySizedChunks(
+                        jdbc, tableId, splitColumnName, min, max, chunkSize);
+            }
+        } else {
+            return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, 
min, max, chunkSize);
+        }
+    }
+
+    /**
+     * Split table into evenly sized chunks based on the numeric min and max 
value of split column,
+     * and tumble chunks in step size.
+     */
+    private List<ChunkRange> splitEvenlySizedChunks(
+            TableId tableId,
+            Object min,
+            Object max,
+            long approximateRowCnt,
+            int chunkSize,
+            int dynamicChunkSize) {
+        LOG.info(
+                "Use evenly-sized chunk optimization for table {}, the 
approximate row count is {}, the chunk size is {}, the dynamic chunk size is 
{}",
+                tableId,
+                approximateRowCnt,
+                chunkSize,
+                dynamicChunkSize);
+        if (approximateRowCnt <= chunkSize) {
+            // there is no more than one chunk, return full table as a chunk
+            return Collections.singletonList(ChunkRange.all());
+        }
+
+        final List<ChunkRange> splits = new ArrayList<>();
+        Object chunkStart = null;
+        Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
+        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
+            splits.add(ChunkRange.of(chunkStart, chunkEnd));
+            chunkStart = chunkEnd;
+            try {
+                chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
+            } catch (ArithmeticException e) {
+                // Stop chunk split to avoid dead loop when number overflows.
+                break;
+            }
+        }
+        // add the ending split
+        splits.add(ChunkRange.of(chunkStart, null));
+        return splits;
+    }
+
+    /** Split table into unevenly sized chunks by continuously calculating 
next chunk max value. */
+    private List<ChunkRange> splitUnevenlySizedChunks(
+            JdbcConnection jdbc,
+            TableId tableId,
+            String splitColumnName,
+            Object min,
+            Object max,
+            int chunkSize)
+            throws SQLException {
+        LOG.info(
+                "Use unevenly-sized chunks for table {}, the chunk size is 
{}", tableId, chunkSize);
+        final List<ChunkRange> splits = new ArrayList<>();
+        Object chunkStart = null;
+        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, 
max, chunkSize);
+        int count = 0;
+        while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
+            // we start from [null, min + chunk_size) and avoid [null, min)
+            splits.add(ChunkRange.of(chunkStart, chunkEnd));
+            // may sleep a while to avoid DDOS on PostgreSQL server
+            maySleep(count++, tableId);
+            chunkStart = chunkEnd;
+            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, 
max, chunkSize);
+        }
+        // add the ending split
+        splits.add(ChunkRange.of(chunkStart, null));
+        return splits;
+    }
+
+    private Object nextChunkEnd(
+            JdbcConnection jdbc,
+            Object previousChunkEnd,
+            TableId tableId,
+            String splitColumnName,
+            Object max,
+            int chunkSize)
+            throws SQLException {
+        // chunk end might be null when max values are removed
+        Object chunkEnd =
+                queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, 
previousChunkEnd);
+        if (Objects.equals(previousChunkEnd, chunkEnd)) {
+            // we don't allow equal chunk start and end,
+            // should query the next one larger than chunkEnd
+            chunkEnd = JdbcChunkUtils.queryMin(jdbc, tableId, splitColumnName, 
chunkEnd);
+        }
+        if (isChunkEndGeMax(chunkEnd, max)) {
+            return null;
+        } else {
+            return chunkEnd;
+        }
+    }
+
+    private SnapshotSplit createSnapshotSplit(
+            JdbcConnection jdbc,
+            TableId tableId,
+            int chunkId,
+            RowType splitKeyType,
+            Object chunkStart,
+            Object chunkEnd) {
+        // currently, we only support single split column
+        Object[] splitStart = chunkStart == null ? null : new Object[] 
{chunkStart};
+        Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
+        Map<TableId, TableChanges.TableChange> schema = new HashMap<>();
+        schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
+        return new SnapshotSplit(
+                tableId,
+                splitId(tableId, chunkId),
+                splitKeyType,
+                splitStart,
+                splitEnd,
+                null,
+                schema);
+    }
+
+    private static String splitId(TableId tableId, int chunkId) {

Review Comment:
   done it.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##########
@@ -132,16 +172,237 @@ default boolean isEvenlySplitColumn(Column splitColumn) {
      * @param splitColumn dbz split column.
      * @return flink data type
      */
-    DataType fromDbzColumn(Column splitColumn);
+    protected abstract DataType fromDbzColumn(Column splitColumn);
+
+    /** Returns the distribution factor of the given table. */
+    protected double calculateDistributionFactor(
+            TableId tableId, Object min, Object max, long approximateRowCnt) {
+
+        if (!min.getClass().equals(max.getClass())) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Unsupported operation type, the MIN value type %s 
is different with MAX value type %s.",
+                            min.getClass().getSimpleName(), 
max.getClass().getSimpleName()));
+        }
+        if (approximateRowCnt == 0) {
+            return Double.MAX_VALUE;
+        }
+        BigDecimal difference = ObjectUtils.minus(max, min);
+        // factor = (max - min + 1) / rowCount
+        final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
+        double distributionFactor =
+                subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, 
ROUND_CEILING).doubleValue();
+        LOG.info(
+                "The distribution factor of table {} is {} according to the 
min split key {}, max split key {} and approximate row count {}",
+                tableId,
+                distributionFactor,
+                min,
+                max,
+                approximateRowCnt);
+        return distributionFactor;
+    }
+
+    /**
+     * Get the column which is seen as chunk key.
+     *
+     * @param table table identity.
+     * @param chunkKeyColumn column name which is seen as chunk key, if 
chunkKeyColumn is null, use
+     *     primary key instead. @Column the column which is seen as chunk key.
+     */
+    protected Column getSplitColumn(Table table, @Nullable String 
chunkKeyColumn) {
+        return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn);
+    }
+
+    /** ChunkEnd less than or equal to max. */
+    protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
+        return ObjectUtils.compare(chunkEnd, max) <= 0;
+    }
+
+    /** ChunkEnd greater than or equal to max. */
+    protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
+        return ObjectUtils.compare(chunkEnd, max) >= 0;
+    }
 
     /**
      * convert dbz column to Flink row type.
      *
      * @param splitColumn split column.
      * @return flink row type.
      */
-    default RowType getSplitType(Column splitColumn) {
+    private RowType getSplitType(Column splitColumn) {
         return (RowType)
                 ROW(FIELD(splitColumn.name(), 
fromDbzColumn(splitColumn))).getLogicalType();
     }
+
+    /**
+     * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
+     * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
+     * many queries and is not efficient.
+     */
+    private List<ChunkRange> splitTableIntoChunks(
+            JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
+        final String splitColumnName = splitColumn.name();
+        final Object[] minMax = JdbcChunkUtils.queryMinMax(jdbc, tableId, 
splitColumnName);
+        final Object min = minMax[0];
+        final Object max = minMax[1];
+        if (min == null || max == null || min.equals(max)) {
+            // empty table, or only one row, return full table scan as a chunk
+            return Collections.singletonList(ChunkRange.all());
+        }
+
+        final int chunkSize = sourceConfig.getSplitSize();
+        final double distributionFactorUpper = 
sourceConfig.getDistributionFactorUpper();
+        final double distributionFactorLower = 
sourceConfig.getDistributionFactorLower();
+
+        if (isEvenlySplitColumn(splitColumn)) {
+            long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
+            double distributionFactor =
+                    calculateDistributionFactor(tableId, min, max, 
approximateRowCnt);
+
+            boolean dataIsEvenlyDistributed =
+                    doubleCompare(distributionFactor, distributionFactorLower) 
>= 0
+                            && doubleCompare(distributionFactor, 
distributionFactorUpper) <= 0;
+
+            if (dataIsEvenlyDistributed) {
+                // the minimum dynamic chunk size is at least 1
+                final int dynamicChunkSize = Math.max((int) 
(distributionFactor * chunkSize), 1);
+                return splitEvenlySizedChunks(
+                        tableId, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
+            } else {
+                return splitUnevenlySizedChunks(
+                        jdbc, tableId, splitColumnName, min, max, chunkSize);
+            }
+        } else {
+            return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, 
min, max, chunkSize);
+        }
+    }
+
+    /**
+     * Split table into evenly sized chunks based on the numeric min and max 
value of split column,
+     * and tumble chunks in step size.
+     */
+    private List<ChunkRange> splitEvenlySizedChunks(
+            TableId tableId,
+            Object min,
+            Object max,
+            long approximateRowCnt,
+            int chunkSize,
+            int dynamicChunkSize) {
+        LOG.info(
+                "Use evenly-sized chunk optimization for table {}, the 
approximate row count is {}, the chunk size is {}, the dynamic chunk size is 
{}",
+                tableId,
+                approximateRowCnt,
+                chunkSize,
+                dynamicChunkSize);
+        if (approximateRowCnt <= chunkSize) {
+            // there is no more than one chunk, return full table as a chunk
+            return Collections.singletonList(ChunkRange.all());
+        }
+
+        final List<ChunkRange> splits = new ArrayList<>();
+        Object chunkStart = null;
+        Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
+        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
+            splits.add(ChunkRange.of(chunkStart, chunkEnd));
+            chunkStart = chunkEnd;
+            try {
+                chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
+            } catch (ArithmeticException e) {
+                // Stop chunk split to avoid dead loop when number overflows.
+                break;
+            }
+        }
+        // add the ending split
+        splits.add(ChunkRange.of(chunkStart, null));
+        return splits;
+    }
+
+    /** Split table into unevenly sized chunks by continuously calculating 
next chunk max value. */
+    private List<ChunkRange> splitUnevenlySizedChunks(
+            JdbcConnection jdbc,
+            TableId tableId,
+            String splitColumnName,
+            Object min,
+            Object max,
+            int chunkSize)
+            throws SQLException {
+        LOG.info(
+                "Use unevenly-sized chunks for table {}, the chunk size is 
{}", tableId, chunkSize);
+        final List<ChunkRange> splits = new ArrayList<>();
+        Object chunkStart = null;
+        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, 
max, chunkSize);
+        int count = 0;
+        while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
+            // we start from [null, min + chunk_size) and avoid [null, min)
+            splits.add(ChunkRange.of(chunkStart, chunkEnd));
+            // may sleep a while to avoid DDOS on PostgreSQL server
+            maySleep(count++, tableId);
+            chunkStart = chunkEnd;
+            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, 
max, chunkSize);
+        }
+        // add the ending split
+        splits.add(ChunkRange.of(chunkStart, null));
+        return splits;
+    }
+
+    private Object nextChunkEnd(
+            JdbcConnection jdbc,
+            Object previousChunkEnd,
+            TableId tableId,
+            String splitColumnName,
+            Object max,
+            int chunkSize)
+            throws SQLException {
+        // chunk end might be null when max values are removed
+        Object chunkEnd =
+                queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, 
previousChunkEnd);
+        if (Objects.equals(previousChunkEnd, chunkEnd)) {
+            // we don't allow equal chunk start and end,
+            // should query the next one larger than chunkEnd
+            chunkEnd = JdbcChunkUtils.queryMin(jdbc, tableId, splitColumnName, 
chunkEnd);
+        }
+        if (isChunkEndGeMax(chunkEnd, max)) {
+            return null;
+        } else {
+            return chunkEnd;
+        }
+    }
+
+    private SnapshotSplit createSnapshotSplit(
+            JdbcConnection jdbc,
+            TableId tableId,
+            int chunkId,
+            RowType splitKeyType,
+            Object chunkStart,
+            Object chunkEnd) {
+        // currently, we only support single split column
+        Object[] splitStart = chunkStart == null ? null : new Object[] 
{chunkStart};
+        Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
+        Map<TableId, TableChanges.TableChange> schema = new HashMap<>();
+        schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
+        return new SnapshotSplit(
+                tableId,
+                splitId(tableId, chunkId),
+                splitKeyType,
+                splitStart,
+                splitEnd,
+                null,
+                schema);
+    }
+
+    private static String splitId(TableId tableId, int chunkId) {
+        return tableId.toString() + ":" + chunkId;
+    }
+
+    private static void maySleep(int count, TableId tableId) {

Review Comment:
   done it.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##########
@@ -132,16 +172,237 @@ default boolean isEvenlySplitColumn(Column splitColumn) {
      * @param splitColumn dbz split column.
      * @return flink data type
      */
-    DataType fromDbzColumn(Column splitColumn);
+    protected abstract DataType fromDbzColumn(Column splitColumn);
+
+    /** Returns the distribution factor of the given table. */
+    protected double calculateDistributionFactor(
+            TableId tableId, Object min, Object max, long approximateRowCnt) {
+
+        if (!min.getClass().equals(max.getClass())) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Unsupported operation type, the MIN value type %s 
is different with MAX value type %s.",
+                            min.getClass().getSimpleName(), 
max.getClass().getSimpleName()));
+        }
+        if (approximateRowCnt == 0) {
+            return Double.MAX_VALUE;
+        }
+        BigDecimal difference = ObjectUtils.minus(max, min);
+        // factor = (max - min + 1) / rowCount
+        final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
+        double distributionFactor =
+                subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, 
ROUND_CEILING).doubleValue();
+        LOG.info(
+                "The distribution factor of table {} is {} according to the 
min split key {}, max split key {} and approximate row count {}",
+                tableId,
+                distributionFactor,
+                min,
+                max,
+                approximateRowCnt);
+        return distributionFactor;
+    }
+
+    /**
+     * Get the column which is seen as chunk key.
+     *
+     * @param table table identity.
+     * @param chunkKeyColumn column name which is seen as chunk key, if 
chunkKeyColumn is null, use
+     *     primary key instead. @Column the column which is seen as chunk key.
+     */
+    protected Column getSplitColumn(Table table, @Nullable String 
chunkKeyColumn) {
+        return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn);
+    }
+
+    /** ChunkEnd less than or equal to max. */
+    protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
+        return ObjectUtils.compare(chunkEnd, max) <= 0;
+    }
+
+    /** ChunkEnd greater than or equal to max. */
+    protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
+        return ObjectUtils.compare(chunkEnd, max) >= 0;
+    }
 
     /**
      * convert dbz column to Flink row type.
      *
      * @param splitColumn split column.
      * @return flink row type.
      */
-    default RowType getSplitType(Column splitColumn) {
+    private RowType getSplitType(Column splitColumn) {
         return (RowType)
                 ROW(FIELD(splitColumn.name(), 
fromDbzColumn(splitColumn))).getLogicalType();
     }
+
+    /**
+     * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
+     * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
+     * many queries and is not efficient.
+     */
+    private List<ChunkRange> splitTableIntoChunks(
+            JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
+        final String splitColumnName = splitColumn.name();
+        final Object[] minMax = JdbcChunkUtils.queryMinMax(jdbc, tableId, 
splitColumnName);

Review Comment:
   done it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to