This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d5847347de [Improve][Connector-v2][MySQL] Optimize shard calculation 
strategy (#9975)
d5847347de is described below

commit d5847347de25225b5eb3e379a4a2001b959daee3
Author: Jast <[email protected]>
AuthorDate: Fri Nov 21 16:12:46 2025 +0800

    [Improve][Connector-v2][MySQL] Optimize shard calculation strategy (#9975)
---
 .../jdbc/internal/dialect/mysql/MysqlDialect.java  |   5 +
 .../internal/dialect/mysql/MysqlDialectTest.java   | 191 +++++++++++++++++++++
 2 files changed, 196 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 88ad8f3d8e..8bf1edc331 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -137,6 +137,11 @@ public class MysqlDialect implements JdbcDialect {
         return map;
     }
 
+    @Override
+    public String hashModForField(String fieldName, int mod) {
+        return "ABS(CRC32(" + quoteIdentifier(fieldName) + ") % " + mod + ")";
+    }
+
     @Override
     public TablePath parse(String tablePath) {
         return TablePath.of(tablePath, false);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialectTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialectTest.java
new file mode 100644
index 0000000000..51fe7d57b6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialectTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.CRC32;
+
+@Slf4j
+public class MysqlDialectTest {
+
+    @Test
+    public void testHashDistributionMD5vsCRC32WithSnowflakeIds() {
+        int totalRecords = 1_100_000;
+        int partitions = 10;
+        List<String> snowflakeIds = generateSnowflakeIds(totalRecords);
+
+        Map<Integer, Integer> md5Distribution = new HashMap<>();
+        for (int i = 0; i < partitions; i++) {
+            md5Distribution.put(i, 0);
+        }
+
+        for (String id : snowflakeIds) {
+            int partition = calculateMD5Partition(id, partitions);
+            md5Distribution.put(partition, md5Distribution.get(partition) + 1);
+        }
+
+        Map<Integer, Integer> crc32Distribution = new HashMap<>();
+        for (int i = 0; i < partitions; i++) {
+            crc32Distribution.put(i, 0);
+        }
+
+        for (String id : snowflakeIds) {
+            int partition = calculateCRC32Partition(id, partitions);
+            crc32Distribution.put(partition, crc32Distribution.get(partition) 
+ 1);
+        }
+
+        log.info("MD5 Distribution (OLD - Has Issue):");
+        for (int i = 0; i < partitions; i++) {
+            int count = md5Distribution.get(i);
+            double percentage = (count * 100.0) / totalRecords;
+            log.info(
+                    String.format(
+                            "  Partition %d: %,7d records (%.2f%%)%s",
+                            i, count, percentage, (percentage > 20 ? " 
SKEWED!" : "")));
+        }
+
+        log.info("CRC32 Distribution (NEW - Fixed):");
+        for (int i = 0; i < partitions; i++) {
+            int count = crc32Distribution.get(i);
+            double percentage = (count * 100.0) / totalRecords;
+            log.info(
+                    String.format(
+                            "  Partition %d: %,7d records (%.2f%%)%s",
+                            i, count, percentage, (percentage > 20 ? " 
SKEWED!" : "")));
+        }
+
+        // Verify that MD5 is severely skewed
+        double md5Partition0Percentage = (md5Distribution.get(0) * 100.0) / 
totalRecords;
+        Assertions.assertTrue(md5Partition0Percentage > 30);
+
+        // Verify that CRC32 is evenly distributed
+        for (int i = 0; i < partitions; i++) {
+            double crc32Percentage = (crc32Distribution.get(i) * 100.0) / 
totalRecords;
+            Assertions.assertTrue(crc32Percentage >= 7 && crc32Percentage <= 
13);
+        }
+
+        double md5StdDev = calculateStandardDeviation(md5Distribution, 
totalRecords, partitions);
+        double crc32StdDev =
+                calculateStandardDeviation(crc32Distribution, totalRecords, 
partitions);
+
+        // The standard deviation of CRC32 should be much smaller than MD5
+        Assertions.assertTrue(crc32StdDev < md5StdDev / 2);
+    }
+
+    /** Generate Snowflake Algorithm ID */
+    private List<String> generateSnowflakeIds(int count) {
+        List<String> ids = new ArrayList<>(count);
+        long baseTimestamp = 1704067200000L;
+        long timestampBits = baseTimestamp << 22;
+
+        for (int i = 0; i < count; i++) {
+            long timeIncrement = (i / 4096) << 22;
+            long machineId = (i % 1024) << 12;
+            long sequence = i % 4096;
+
+            long snowflakeId = timestampBits + timeIncrement + machineId + 
sequence;
+            ids.add(String.valueOf(snowflakeId));
+        }
+
+        return ids;
+    }
+
+    /** Simulate the MD5 behavior of MySQL */
+    private int calculateMD5Partition(String id, int mod) {
+        try {
+            MessageDigest md = MessageDigest.getInstance("MD5");
+            byte[] digest = md.digest(id.getBytes());
+
+            StringBuilder hexString = new StringBuilder();
+            for (byte b : digest) {
+                String hex = Integer.toHexString(0xff & b);
+                if (hex.length() == 1) {
+                    hexString.append('0');
+                }
+                hexString.append(hex);
+            }
+
+            String hexResult = hexString.toString();
+            long numericValue = convertHexStringToNumberMySQLWay(hexResult);
+
+            return (int) Math.abs(numericValue % mod);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Simulate MySQL string to number conversion: Read from left to right and 
stop when the first
+     * non numeric character is encountered.
+     */
+    private long convertHexStringToNumberMySQLWay(String hexString) {
+        if (hexString == null || hexString.isEmpty()) {
+            return 0;
+        }
+
+        StringBuilder numericPart = new StringBuilder();
+        for (char c : hexString.toCharArray()) {
+            if (c >= '0' && c <= '9') {
+                numericPart.append(c);
+            } else {
+                break;
+            }
+        }
+
+        if (numericPart.length() == 0) {
+            return 0;
+        }
+
+        try {
+            return Long.parseLong(numericPart.toString());
+        } catch (NumberFormatException e) {
+            return 0;
+        }
+    }
+
+    /** Simulate CRC32 behavior */
+    private int calculateCRC32Partition(String id, int mod) {
+        CRC32 crc32 = new CRC32();
+        crc32.update(id.getBytes());
+        long crcValue = crc32.getValue();
+
+        return (int) Math.abs(crcValue % mod);
+    }
+
+    private double calculateStandardDeviation(
+            Map<Integer, Integer> distribution, int totalRecords, int 
partitions) {
+        double mean = totalRecords / (double) partitions;
+        double sumSquaredDiff = 0;
+
+        for (int i = 0; i < partitions; i++) {
+            double diff = distribution.get(i) - mean;
+            sumSquaredDiff += diff * diff;
+        }
+
+        return Math.sqrt(sumSquaredDiff / partitions);
+    }
+}

Reply via email to