This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d5847347de [Improve][Connector-v2][MySQL] Optimize shard calculation
strategy (#9975)
d5847347de is described below
commit d5847347de25225b5eb3e379a4a2001b959daee3
Author: Jast <[email protected]>
AuthorDate: Fri Nov 21 16:12:46 2025 +0800
[Improve][Connector-v2][MySQL] Optimize shard calculation strategy (#9975)
---
.../jdbc/internal/dialect/mysql/MysqlDialect.java | 5 +
.../internal/dialect/mysql/MysqlDialectTest.java | 191 +++++++++++++++++++++
2 files changed, 196 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 88ad8f3d8e..8bf1edc331 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -137,6 +137,11 @@ public class MysqlDialect implements JdbcDialect {
return map;
}
+ @Override
+ public String hashModForField(String fieldName, int mod) {
+ return "ABS(CRC32(" + quoteIdentifier(fieldName) + ") % " + mod + ")";
+ }
+
@Override
public TablePath parse(String tablePath) {
return TablePath.of(tablePath, false);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialectTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialectTest.java
new file mode 100644
index 0000000000..51fe7d57b6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialectTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.CRC32;
+
+@Slf4j
+public class MysqlDialectTest {
+
+ @Test
+ public void testHashDistributionMD5vsCRC32WithSnowflakeIds() {
+ int totalRecords = 1_100_000;
+ int partitions = 10;
+ List<String> snowflakeIds = generateSnowflakeIds(totalRecords);
+
+ Map<Integer, Integer> md5Distribution = new HashMap<>();
+ for (int i = 0; i < partitions; i++) {
+ md5Distribution.put(i, 0);
+ }
+
+ for (String id : snowflakeIds) {
+ int partition = calculateMD5Partition(id, partitions);
+ md5Distribution.put(partition, md5Distribution.get(partition) + 1);
+ }
+
+ Map<Integer, Integer> crc32Distribution = new HashMap<>();
+ for (int i = 0; i < partitions; i++) {
+ crc32Distribution.put(i, 0);
+ }
+
+ for (String id : snowflakeIds) {
+ int partition = calculateCRC32Partition(id, partitions);
+ crc32Distribution.put(partition, crc32Distribution.get(partition)
+ 1);
+ }
+
+ log.info("MD5 Distribution (OLD - Has Issue):");
+ for (int i = 0; i < partitions; i++) {
+ int count = md5Distribution.get(i);
+ double percentage = (count * 100.0) / totalRecords;
+ log.info(
+ String.format(
+ " Partition %d: %,7d records (%.2f%%)%s",
+ i, count, percentage, (percentage > 20 ? "
SKEWED!" : "")));
+ }
+
+ log.info("CRC32 Distribution (NEW - Fixed):");
+ for (int i = 0; i < partitions; i++) {
+ int count = crc32Distribution.get(i);
+ double percentage = (count * 100.0) / totalRecords;
+ log.info(
+ String.format(
+ " Partition %d: %,7d records (%.2f%%)%s",
+ i, count, percentage, (percentage > 20 ? "
SKEWED!" : "")));
+ }
+
+ // Verify that MD5 is severely skewed
+ double md5Partition0Percentage = (md5Distribution.get(0) * 100.0) /
totalRecords;
+ Assertions.assertTrue(md5Partition0Percentage > 30);
+
+ // Verify that CRC32 is evenly distributed
+ for (int i = 0; i < partitions; i++) {
+ double crc32Percentage = (crc32Distribution.get(i) * 100.0) /
totalRecords;
+ Assertions.assertTrue(crc32Percentage >= 7 && crc32Percentage <=
13);
+ }
+
+ double md5StdDev = calculateStandardDeviation(md5Distribution,
totalRecords, partitions);
+ double crc32StdDev =
+ calculateStandardDeviation(crc32Distribution, totalRecords,
partitions);
+
+ // The standard deviation of CRC32 should be much smaller than MD5
+ Assertions.assertTrue(crc32StdDev < md5StdDev / 2);
+ }
+
+ /** Generate Snowflake Algorithm ID */
+ private List<String> generateSnowflakeIds(int count) {
+ List<String> ids = new ArrayList<>(count);
+ long baseTimestamp = 1704067200000L;
+ long timestampBits = baseTimestamp << 22;
+
+ for (int i = 0; i < count; i++) {
+ long timeIncrement = (i / 4096) << 22;
+ long machineId = (i % 1024) << 12;
+ long sequence = i % 4096;
+
+ long snowflakeId = timestampBits + timeIncrement + machineId +
sequence;
+ ids.add(String.valueOf(snowflakeId));
+ }
+
+ return ids;
+ }
+
+ /** Simulate the MD5 behavior of MySQL */
+ private int calculateMD5Partition(String id, int mod) {
+ try {
+ MessageDigest md = MessageDigest.getInstance("MD5");
+ byte[] digest = md.digest(id.getBytes());
+
+ StringBuilder hexString = new StringBuilder();
+ for (byte b : digest) {
+ String hex = Integer.toHexString(0xff & b);
+ if (hex.length() == 1) {
+ hexString.append('0');
+ }
+ hexString.append(hex);
+ }
+
+ String hexResult = hexString.toString();
+ long numericValue = convertHexStringToNumberMySQLWay(hexResult);
+
+ return (int) Math.abs(numericValue % mod);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Simulate MySQL string to number conversion: Read from left to right and
stop when the first
+ * non numeric character is encountered.
+ */
+ private long convertHexStringToNumberMySQLWay(String hexString) {
+ if (hexString == null || hexString.isEmpty()) {
+ return 0;
+ }
+
+ StringBuilder numericPart = new StringBuilder();
+ for (char c : hexString.toCharArray()) {
+ if (c >= '0' && c <= '9') {
+ numericPart.append(c);
+ } else {
+ break;
+ }
+ }
+
+ if (numericPart.length() == 0) {
+ return 0;
+ }
+
+ try {
+ return Long.parseLong(numericPart.toString());
+ } catch (NumberFormatException e) {
+ return 0;
+ }
+ }
+
+ /** Simulate CRC32 behavior */
+ private int calculateCRC32Partition(String id, int mod) {
+ CRC32 crc32 = new CRC32();
+ crc32.update(id.getBytes());
+ long crcValue = crc32.getValue();
+
+ return (int) Math.abs(crcValue % mod);
+ }
+
+ private double calculateStandardDeviation(
+ Map<Integer, Integer> distribution, int totalRecords, int
partitions) {
+ double mean = totalRecords / (double) partitions;
+ double sumSquaredDiff = 0;
+
+ for (int i = 0; i < partitions; i++) {
+ double diff = distribution.get(i) - mean;
+ sumSquaredDiff += diff * diff;
+ }
+
+ return Math.sqrt(sumSquaredDiff / partitions);
+ }
+}