Re: [I] [SUPPORT] Flink bucket index partitioner may cause data skew [hudi]

2024-05-29 Thread via GitHub


xicm closed issue #11288: [SUPPORT] Flink bucket index partitioner may cause 
data skew
URL: https://github.com/apache/hudi/issues/11288


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [SUPPORT] Flink bucket index partitioner may cause data skew [hudi]

2024-05-27 Thread via GitHub


eric9204 commented on issue #11288:
URL: https://github.com/apache/hudi/issues/11288#issuecomment-2132903234

   This issue is primarily attributed to hash collisions and the implications 
of modulo operations in distribution.
   
   Firstly, bucket identifiers are intended to be consecutive, while partition 
paths may either follow a sequential or random pattern. The combination of 
different partitions and buckets should ideally yield unique hash values. 
However, when inferior hash functions are employed and subjected to modulo 
operations with respect to the level of parallelism, they can lead to a higher 
likelihood of hash collisions. Consequently, post-modulo calculation, data 
distribution across subtasks becomes uneven.
   
   Secondly, at its core, the modulo operation serves to map the combined hash 
values of partition paths and buckets into a continuous space ranging from 0 to 
`parallelism - 1`. In instances where the parallelism is a prime number, the 
modulo operation tends to mitigate uneven distribution slightly more 
effectively due to the unique properties of primes reducing common residue 
patterns. Nonetheless, in practical deployments where parallelism often does 
not conform to prime numbers, the modulo operation results in hash values with 
identical remainders being grouped within the same subtask, thereby 
exacerbating the problem of non-uniform data distribution. In essence, the 
categorization nature of modulo operations amplifies data skew under non-prime 
parallelism scenarios.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [SUPPORT] Flink bucket index partitioner may cause data skew [hudi]

2024-05-26 Thread via GitHub


danny0405 commented on issue #11288:
URL: https://github.com/apache/hudi/issues/11288#issuecomment-2132172783

   I guess you are saying `(Hash(partition) % parallelism * bucket_num + 
bucket_id) % parallelism ` ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [SUPPORT] Flink bucket index partitioner may cause data skew [hudi]

2024-05-24 Thread via GitHub


danny0405 commented on issue #11288:
URL: https://github.com/apache/hudi/issues/11288#issuecomment-2130654910

   The `parallelism/bucket_num` could be 0 when parallelism < bucket_num.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [SUPPORT] Flink bucket index partitioner may cause data skew [hudi]

2024-05-24 Thread via GitHub


xicm commented on issue #11288:
URL: https://github.com/apache/hudi/issues/11288#issuecomment-2129894719

   **(Hash(partition) % parallelism * bucket_num + bucket_num) % parallelism**  
  seems a good way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [SUPPORT] Flink bucket index partitioner may cause data skew [hudi]

2024-05-24 Thread via GitHub


xicm commented on issue #11288:
URL: https://github.com/apache/hudi/issues/11288#issuecomment-2129890980

   Some tests
   ```
   package org.example;
   
   import java.util.ArrayList;
   import java.util.Comparator;
   import java.util.List;
   
   
   public class Test {
   
   public static void main(String[] args) {
   List list1 = new ArrayList();
   List list2 = new ArrayList();
   List list3 = new ArrayList();
   List list4 = new ArrayList();
   List list5 = new ArrayList();
   List list6 = new ArrayList();
   
   System.out.println("bucket_num % parallelism");
   
   for (int i = 0; i < 20; i++) {
   list1.add(i % 20);
   }
   for (int i = 0; i < 20; i++) {
   list1.add(i % 20);
   }
   
   list1.sort(Comparator.naturalOrder());
   for (int n : list1) {
   System.out.print(n);
   System.out.print(" ");
   }
   
   System.out.println();
   System.out.println("Hash(partition + bucket_num) % parallelism");
   System.out.println("parallelism = 20");
   
   for (int i = 0; i < 20; i++) {
   list2.add((("part1" + i).hashCode() & Integer.MAX_VALUE) % 20);
   }
   for (int i = 0; i < 20; i++) {
   list2.add((("part2" + i).hashCode() & Integer.MAX_VALUE) % 20);
   }
   
   list2.sort(Comparator.naturalOrder());
   for (int n : list2) {
   System.out.print(n);
   System.out.print(" ");
   }
   
   System.out.println();
   System.out.println("(Hash(partition) + bucket_num) % parallelism");
   System.out.println("parallelism = 20");
   
   for (int i = 0; i < 20; i++) {
   list3.add((("part1".hashCode() & Integer.MAX_VALUE) + i) % 20);
   }
   for (int i = 0; i < 20; i++) {
   list3.add((("part2".hashCode() & Integer.MAX_VALUE) + i) % 20);
   }
   
   list3.sort(Comparator.naturalOrder());
   for (int n : list3) {
   System.out.print(n);
   System.out.print(" ");
   }
   
   System.out.println();
   System.out.println("(Hash(partition) % parallelism + bucket_num) % 
parallelism");
   System.out.println("parallelism = 20");
   
   for (int i = 0; i < 20; i++) {
   list4.add((("part1".hashCode() & Integer.MAX_VALUE) + i) % 20);
   }
   for (int i = 0; i < 20; i++) {
   list4.add((("part2".hashCode() & Integer.MAX_VALUE) + i) % 20);
   }
   
   list4.sort(Comparator.naturalOrder());
   for (int n : list4) {
   System.out.print(n);
   System.out.print(" ");
   }
   
   System.out.println();
   System.out.println("(Hash(partition) % parallelism + bucket_num) % 
parallelism");
   System.out.println("parallelism = 40");
   
   // bucket_num = 20, parallelism = 40
   for (int i = 0; i < 20; i++) {
   list5.add((("part1".hashCode() & Integer.MAX_VALUE) % 40 + i) % 
40);
   }
   for (int i = 0; i < 20; i++) {
   list5.add((("part2".hashCode() & Integer.MAX_VALUE) % 40 + i) % 
40);
   }
   
   list5.sort(Comparator.naturalOrder());
   for (int n : list5) {
   System.out.print(n);
   System.out.print(" ");
   }
   
   System.out.println();
   System.out.println("(Hash(partition) % parallelism * bucket_num + 
bucket_num) % parallelism");
   System.out.println("parallelism = 40");
   
   for (int i = 0; i < 20; i++) {
   list6.add((("part1".hashCode() & Integer.MAX_VALUE) % 40 * 20 + 
i) % 40);
   }
   for (int i = 0; i < 20; i++) {
   list6.add((("part2".hashCode() & Integer.MAX_VALUE) % 40 * 20 + 
i) % 40);
   }
   
   list6.sort(Comparator.naturalOrder());
   for (int n : list6) {
   System.out.print(n);
   System.out.print(" ");
   }
   
   }
   }
   
   ```
   
   
![image](https://github.com/apache/hudi/assets/36392121/ae1f9588-9eff-4560-94ed-f6e59fe39b57)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [SUPPORT] Flink bucket index partitioner may cause data skew [hudi]

2024-05-24 Thread via GitHub


xicm commented on issue #11288:
URL: https://github.com/apache/hudi/issues/11288#issuecomment-2129624423

   I think we can save the partition+bucket_number to a ArrayList and make sure 
the values in the ArrayList are unique.
   We get the index of a partition+bucket_number of a record from the 
ArrayList, and use the index mod the parallelism  as the shuffle index.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [SUPPORT] Flink bucket index partitioner may cause data skew [hudi]

2024-05-24 Thread via GitHub


xicm opened a new issue, #11288:
URL: https://github.com/apache/hudi/issues/11288

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at 
dev-subscr...@hudi.apache.org.
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   https://github.com/apache/hudi/pull/7815 works well for a single partition 
sink.
   
   
https://github.com/apache/hudi/blob/bcc1f8de4d9d0ac8c3cf6c51bb0a802ef2bcee20/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java#L42-L48
   
   If we write two or more partition in a batch, the partitioner may cause data 
skew.
   
   For example, we set the write parallelism to 8, bucket num to 4, and we 
write two partitions in a batch.
   Suppose that the partition hash modulo parallelism is 1 and 2。
   
   partition1:  partitionIndex=1,  bucket_num = [0, 1, 2, 3]
   partition2:  partitionIndex=2,  bucket_num = [0, 1, 2, 3]
   
   The globalIndex for each bucket (int globalIndex = partitionIndex + 
curBucket;)
   partition1:  globalIndex  = [1, 2, 3, 4]
   partition2:  globalIndex  = [2, 3, 4, 5]
   
   then mod 8,
   partition1:  [1, 2, 3, 4]
   partition2:   [2, 3, 4, 5]
   
   The partition result will be [1, 2, 3, 4, 5],our parallelism is 8, but we 
get 5 buckets,  bucket 2 and bucket 3 will get two pieces of data.
   
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1.
   2.
   3.
   4.
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version :
   
   * Spark version :
   
   * Hive version :
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) :
   
   * Running on Docker? (yes/no) :
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org