Sahil333 opened a new issue, #18246: URL: https://github.com/apache/hudi/issues/18246
### Describe the problem you faced I am writing a spark job that reads a microbatch of 1M records from a kafka topic and writes it to a Hudi table ( backed by S3 ) that is partitioned by two columns of the input messages. In my performance test, I created record entries in kafka such that there are ~100 unique values for column 1 and ~60 unique values for column 2. This results in around ~6000 partitions for a given microbatch. The issue is happening at the "SparkRDDWriteClient:Committing stats: <table>" job where it is running only 1 task for all the ~6000 rdd partitions. In this task, after checking the executor logs, it seems to be doing small file merge for second batch or create file for the first batch. Essentially, all the partitions are sequentially processed in this task causing my batch processing time to be ~50 minutes. The above behavior started happening when I upgraded from Hudi 0.15.0 to Hudi 1.1.0. With 0.15.0, no pipeline code changes, it was taking ~4 minutes for each batch. It used to run a job "Doing partitioning and writing data" before "Committing stats" which would run 20 tasks. Even "Committing stats" would run 20 tasks with Hudi 0.15.0. I have tried playing around multiple configurations e.g. *.shuffle.parallelism, enable AQE, do repartition before making writeStream on Dataset read from kafka but nothing seems to affecting "Committing stats" job parallelism ### To Reproduce 1. Create an input topic with ~5M messages. Message schema can have 3 columns - 1 record key, 2 partition columns. Make sure it has same similar distribution of unique values as described above 2. Define a StreamingQuery pipeline in java to read from kafka and write to hudi with paritioning on the 2 columns. Set the maxOffsetsPerTrigger to 1M 3. Run the spark job on with 5 executors, each 4 cpus ( change this accordingly, shouldn't matter much ) 4. Check the Spark-UI for "Committing stats" job. It should be similar to mine. Running only 1 task. ### Expected behavior It should have parallelized "Committing stats" job across the ~6000 partitions on all the available executor cores. ### Environment Description * Hudi version: 1.1.0 * Spark version: 3.5.6 * Flink version: * Hive version: * Hadoop version: 3.4.1 * Storage (HDFS/S3/GCS..): S3 * Running on Docker? (yes/no): yes ( Kubernetes ) ### Additional context Logs from executor running that task - [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.spark.storage.memory.MemoryStore - Block rdd_210_5848 stored as values in memory (estimated size 357.0 B, free 6.1 GiB) [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 7 (16.9 KiB) non-empty blocks including 2 (4.8 KiB) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 5 (12.1 KiB) remote blocks [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 4 remote fetches in 0 ms [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor - Merging updates for commit 20260225132240337 for file 932d1629-b29b-4867-93f3-d6e870ddc379-0 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor - Small file corrections for updates for commit 20260225132240337 for file 932d1629-b29b-4867-93f3-d6e870ddc379-0 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.io.HoodieMergeHandleFactory - Create HoodieMergeHandle implementation org.apache.hudi.io.FileGroupReaderBasedMergeHandle for fileId 932d1629-b29b-4867-93f3-d6e870ddc379-0 and partition path 2020-05/php at commit 20260225132240337 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.io.HoodieAbstractMergeHandle - partitionPath:2020-05/php, targetFileId to be merged: 932d1629-b29b-4867-93f3-d6e870ddc379-0 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.io.HoodieAbstractMergeHandle - Merging new data into oldPath: s3a://concentric-perf-scale/lakehouse/sahil/data/tenant_123/content_partitioned/2bcd3a0a-8937-4575-8d78-11f875178763/1234/2020-05/php/932d1629-b29b-4867-93f3-d6e870ddc379-0_0-46-25371_20260225122743157.parquet, as newPath: s3a://concentric-perf-scale/lakehouse/sahil/data/tenant_123/content_partitioned/2bcd3a0a-8937-4575-8d78-11f875178763/1234/2020-05/php/932d1629-b29b-4867-93f3-d6e870ddc379-0_0-69-44078_20260225132240337.parquet [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers - [timeline-server-based] Created marker file 2020-05/php/932d1629-b29b-4867-93f3-d6e870ddc379-0_0-69-44078_20260225132240337.parquet.marker.MERGE in 65 ms [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - RecordReader initialized will read a total of 13 records. [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - at row 0. reading next block [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - block read in memory in 18 ms. row count = 13 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.common.util.collection.ExternalSpillableMap - KeyBasedFileGroupRecordBuffer : Total entries in InMemory map 9, with average record size as 832, currentInMemoryMapSize 7488. No entries were spilled to disk. [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.io.HoodieWriteMergeHandle - MergeHandle for partitionPath 2020-05/php fileID 932d1629-b29b-4867-93f3-d6e870ddc379-0, took 468 ms. [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.spark.storage.memory.MemoryStore - Block rdd_210_5849 stored as values in memory (estimated size 357.0 B, free 6.1 GiB) [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 11 (26.6 KiB) non-empty blocks including 2 (4.9 KiB) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 9 (21.8 KiB) remote blocks [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 3 remote fetches in 0 ms [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor - Merging updates for commit 20260225132240337 for file 9b2bd8ab-1746-4f19-a45b-1bad58fcbfa4-0 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor - Small file corrections for updates for commit 20260225132240337 for file 9b2bd8ab-1746-4f19-a45b-1bad58fcbfa4-0 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.io.HoodieMergeHandleFactory - Create HoodieMergeHandle implementation org.apache.hudi.io.FileGroupReaderBasedMergeHandle for fileId 9b2bd8ab-1746-4f19-a45b-1bad58fcbfa4-0 and partition path 2019-11/svg at commit 20260225132240337 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.io.HoodieAbstractMergeHandle - partitionPath:2019-11/svg, targetFileId to be merged: 9b2bd8ab-1746-4f19-a45b-1bad58fcbfa4-0 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.io.HoodieAbstractMergeHandle - Merging new data into oldPath: s3a://concentric-perf-scale/lakehouse/sahil/data/tenant_123/content_partitioned/2bcd3a0a-8937-4575-8d78-11f875178763/1234/2019-11/svg/9b2bd8ab-1746-4f19-a45b-1bad58fcbfa4-0_0-46-25371_20260225122743157.parquet, as newPath: s3a://concentric-perf-scale/lakehouse/sahil/data/tenant_123/content_partitioned/2bcd3a0a-8937-4575-8d78-11f875178763/1234/2019-11/svg/9b2bd8ab-1746-4f19-a45b-1bad58fcbfa4-0_0-69-44078_20260225132240337.parquet [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers - [timeline-server-based] Created marker file 2019-11/svg/9b2bd8ab-1746-4f19-a45b-1bad58fcbfa4-0_0-69-44078_20260225132240337.parquet.marker.MERGE in 93 ms [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - RecordReader initialized will read a total of 18 records. [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - at row 0. reading next block [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - block read in memory in 56 ms. row count = 18 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.common.util.collection.ExternalSpillableMap - KeyBasedFileGroupRecordBuffer : Total entries in InMemory map 12, with average record size as 824, currentInMemoryMapSize 9888. No entries were spilled to disk. [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.io.HoodieWriteMergeHandle - MergeHandle for partitionPath 2019-11/svg fileID 9b2bd8ab-1746-4f19-a45b-1bad58fcbfa4-0, took 514 ms. [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.spark.storage.memory.MemoryStore - Block rdd_210_5850 stored as values in memory (estimated size 358.0 B, free 6.1 GiB) [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 48 (116.1 KiB) non-empty blocks including 10 (24.3 KiB) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 38 (91.8 KiB) remote blocks [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 4 remote fetches in 0 ms [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor - Merging updates for commit 20260225132240337 for file aad91df9-070e-48ac-b12f-eec36c913560-0 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor - Small file corrections for updates for commit 20260225132240337 for file aad91df9-070e-48ac-b12f-eec36c913560-0 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.io.HoodieMergeHandleFactory - Create HoodieMergeHandle implementation org.apache.hudi.io.FileGroupReaderBasedMergeHandle for fileId aad91df9-070e-48ac-b12f-eec36c913560-0 and partition path 2025-01/pptx at commit 20260225132240337 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.io.HoodieAbstractMergeHandle - partitionPath:2025-01/pptx, targetFileId to be merged: aad91df9-070e-48ac-b12f-eec36c913560-0 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.io.HoodieAbstractMergeHandle - Merging new data into oldPath: s3a://concentric-perf-scale/lakehouse/sahil/data/tenant_123/content_partitioned/2bcd3a0a-8937-4575-8d78-11f875178763/1234/2025-01/pptx/aad91df9-070e-48ac-b12f-eec36c913560-0_0-46-25371_20260225122743157.parquet, as newPath: s3a://concentric-perf-scale/lakehouse/sahil/data/tenant_123/content_partitioned/2bcd3a0a-8937-4575-8d78-11f875178763/1234/2025-01/pptx/aad91df9-070e-48ac-b12f-eec36c913560-0_0-69-44078_20260225132240337.parquet [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers - [timeline-server-based] Created marker file 2025-01/pptx/aad91df9-070e-48ac-b12f-eec36c913560-0_0-69-44078_20260225132240337.parquet.marker.MERGE in 110 ms [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.common.util.collection.ExternalSpillableMap - KeyBasedFileGroupRecordBuffer : Updated Estimated Payload size 827 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - RecordReader initialized will read a total of 241 records. [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - at row 0. reading next block [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - block read in memory in 26 ms. row count = 241 [Executor task launch worker for task 0.0 in stage 69.0 (TID 44078)] INFO org.apache.hudi.common.util.collection.ExternalSpillableMap - KeyBasedFileGroupRecordBuffer : Total entries in InMemory map 125, with average record size as 827, currentInMemoryMapSize 103375. No entries were spilled to disk. Attaching Screenshots from Spark-UI <img width="1948" height="817" alt="Image" src="https://github.com/user-attachments/assets/fc19b0e3-b8df-44c9-945a-ad30c890da44" /> <img width="1877" height="945" alt="Image" src="https://github.com/user-attachments/assets/22094545-8e12-4389-8737-23c93dc8be73" /> <img width="1898" height="954" alt="Image" src="https://github.com/user-attachments/assets/9e40ddd3-0726-4e30-ad3a-0fc4de1e59cf" /> <img width="1908" height="784" alt="Image" src="https://github.com/user-attachments/assets/7733ca19-6929-4e86-aeb7-76e770b0674c" /> <img width="1169" height="943" alt="Image" src="https://github.com/user-attachments/assets/4ba80064-a4b1-4aba-a6e7-fbbfd7f1b52f" /> <img width="1916" height="953" alt="Image" src="https://github.com/user-attachments/assets/dff59e18-354a-4394-9809-e1b8a3682f17" /> <img width="1907" height="955" alt="Image" src="https://github.com/user-attachments/assets/119ea0fe-31bd-4d16-9ca0-e1217a042778" /> ### Stacktrace ```shell ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
