This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 680aad70c3 [core] Start channel should not be negative when 
partition.hashCode() equals to Integer.MIN_VALUE (#5623)
680aad70c3 is described below

commit 680aad70c3f50cb9293f91fb75eb773389475320
Author: tsreaper <[email protected]>
AuthorDate: Mon May 19 17:36:42 2025 +0800

    [core] Start channel should not be negative when partition.hashCode() 
equals to Integer.MIN_VALUE (#5623)
---
 .../org/apache/paimon/table/sink/ChannelComputer.java   | 17 +++++++++++++++--
 .../postpone/PostponeBucketCompactSplitSource.java      |  9 ++-------
 .../flink/sink/PostponeBucketChannelComputer.java       |  5 +++--
 3 files changed, 20 insertions(+), 11 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java
index 52f1dda36b..6bac8b3a8a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java
@@ -35,14 +35,27 @@ public interface ChannelComputer<T> extends Serializable {
     int channel(T record);
 
     static int select(BinaryRow partition, int bucket, int numChannels) {
-        int startChannel = Math.abs(partition.hashCode()) % numChannels;
-        return (startChannel + bucket) % numChannels;
+        return (startChannel(partition, numChannels) + bucket) % numChannels;
     }
 
     static int select(int bucket, int numChannels) {
         return bucket % numChannels;
     }
 
+    static int startChannel(BinaryRow partition, int numChannels) {
+        int hashCode = partition.hashCode();
+        if (hashCode == Integer.MIN_VALUE) {
+            hashCode = Integer.MAX_VALUE;
+        }
+        // Due to backward compatibility (Flink users may recover from state),
+        // we need to use this formula.
+        // However, if hashCode equals Integer.MIN_VALUE,
+        // Math.abs will still return Integer.MIN_VALUE,
+        // and this formula will produce a negative integer.
+        // So we specially handle this case above.
+        return Math.abs(hashCode) % numChannels;
+    }
+
     static <T, R> ChannelComputer<R> transform(
             ChannelComputer<T> input, SerializableFunction<R, T> converter) {
         return new ChannelComputer<R>() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
index 1cab64c2ca..53bc7a50a7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
@@ -186,13 +186,8 @@ public class PostponeBucketCompactSplitSource extends 
AbstractNonCoordinatedSour
                     matcher.find(),
                     "Data file name %s does not match the pattern. This is 
unexpected.",
                     fileName);
-
-            // use long to avoid overflow
-            long subtaskId = Long.parseLong(matcher.group(1));
-            // send records written by the same subtask to the same subtask
-            // to make sure we replay the written records in the exact order
-            long channel = (Math.abs(dataSplit.partition().hashCode()) + 
subtaskId) % numChannels;
-            return (int) channel;
+            return ChannelComputer.select(
+                    dataSplit.partition(), Integer.parseInt(matcher.group(1)), 
numChannels);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketChannelComputer.java
index ca2bc17a0e..af56fef8b7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketChannelComputer.java
@@ -50,7 +50,8 @@ public class PostponeBucketChannelComputer implements 
ChannelComputer<InternalRo
     @Override
     public int channel(InternalRow record) {
         extractor.setRecord(record);
-        return Math.abs(extractor.partition().hashCode() + 
extractor.trimmedPrimaryKey().hashCode())
-                % numChannels;
+        return Math.abs(
+                (extractor.partition().hashCode() + 
extractor.trimmedPrimaryKey().hashCode())
+                        % numChannels);
     }
 }

Reply via email to