andygrove opened a new issue, #3057:
URL: https://github.com/apache/datafusion-comet/issues/3057

   ### What is the problem the feature request solves?
   
   We have seen poor performance with JVM shuffle in some cases, especially 
when complex types are involved. I asked Claude to take a look and it came up 
with some ideas that I plan on exploring.
   
   ### What Claude Said
   
   Based on my review, here are the significant performance issues I found in 
the sort-based shuffle implementation:
   
     Performance Analysis: Sort-Based Shuffle
   
     1. Per-Partition JNI Calls (Critical)
   
     Location: CometShuffleExternalSorter.java:538-600
   
     while (sortedRecords.hasNext()) {
         // ...
         if (partition != currentPartition) {
             if (currentPartition != -1) {
                 // JNI call for EACH partition
                 long written = doSpilling(dataTypes, spillInfo.file, 
rowPartition, ...);
             }
             currentPartition = partition;
         }
         rowPartition.addRow(...);
     }
   
     With 200 partitions, this makes ~200 separate JNI calls to 
writeSortedFileNative. Each call:
     - Deserializes schema (converts Array[Array[Byte]] to Rust types)
     - Gets JNI array elements
     - Creates Arrow builders
     - Opens file, writes, closes
   
     Impact: JNI overhead multiplied by partition count. With many partitions, 
this dominates execution time.
   
     ---
     2. RowPartition Array Allocation Per Partition
   
     Location: RowPartition.scala:39-54
   
     def getRowAddresses: Array[Long] = {
       val array = rowAddresses.toArray  // Creates NEW array copy
       rowAddresses = null
       array
     }
   
     def reset(): Unit = {
       rowAddresses = new ArrayBuffer[Long](initialSize)  // NEW ArrayBuffer
       rowSizes = new ArrayBuffer[Int](initialSize)
     }
   
     For each partition boundary:
     1. toArray allocates a new array and copies all elements
     2. reset() creates new ArrayBuffers
   
     Impact: O(partitions × rows_per_partition) allocations and copies.
   
     ---
     3. File Open/Close Per Batch in Native Code
   
     Location: row.rs:826-831
   
     while current_row < row_num {
         // ... build batch ...
   
         // Opens file on EVERY batch iteration
         let mut output_data = OpenOptions::new()
             .create(true)
             .append(true)
             .open(&output_path)?;
         output_data.write_all(&frozen)?;
         // File closed here (drop)
     }
   
     Impact: Repeated syscalls for file open/close on every batch within every 
partition.
   
     ---
     4. Builder Re-creation Per Batch
   
     Location: row.rs:782-787
   
     while current_row < row_num {
         let mut data_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
         schema.iter().try_for_each(|dt| {
             make_builders(dt, n, prefer_dictionary_ratio)  // NEW builders 
each batch
                 .map(|builder| data_builders.push(builder))?;
         })?;
         // ...
     }
   
     Impact: Heap allocations for builders on every batch. Builders could be 
reset and reused.
   
     ---
     5. Column-by-Column Row Access (Cache Unfriendly)
   
     Location: row.rs:792-803
   
     for (idx, builder) in data_builders.iter_mut().enumerate() {
         append_columns(row_addresses_ptr, row_sizes_ptr, row_start, row_end,
                        schema, idx, builder, ...)?;
     }
   
     Inside append_columns:
     for i in row_start..row_end {
         let row_addr = unsafe { *row_addresses_ptr.add(i) };
         // Access row data for column idx
     }
   
     Impact: For each column, iterates all rows. Row data is accessed multiple 
times (once per column), causing cache misses.
   
     ---
     6. Double Data Copy Path
   
     Location: CometUnsafeShuffleWriter.java:296-310 and 
CometShuffleExternalSorter.java:642-649
   
     // Writer: serialize to buffer
     serOutputStream.writeValue((UnsafeRow) record._2(), OBJECT_CLASS_TAG);
   
     // Sorter: copy to memory page  
     Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
   
     Data path: UnsafeRow → serialized buffer → memory page → native Arrow 
conversion
   
     ---
     Recommended Fixes (Priority Order)
   
     1. Batch partition writes: Collect all rows across partitions, then make a 
single JNI call that writes all partitions. Pass partition boundaries to native 
code.
     2. Reuse file handle: Open file once at start of 
process_sorted_row_partition, write all batches, close once.
     3. Reuse Arrow builders: Create builders once, call builder.finish() and 
recreate only when schema changes, or use builder reset patterns.
     4. Pre-allocate RowPartition arrays: Use primitive arrays with a cursor 
instead of ArrayBuffer to avoid toArray copies.
     5. Row-major processing: Consider processing row-by-row to improve cache 
locality, appending to all column builders in one pass per row.
   
   
   
   ### Describe the potential solution
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to