maytasm commented on code in PR #19357:
URL: https://github.com/apache/druid/pull/19357#discussion_r3192535630


##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java:
##########
@@ -293,6 +320,22 @@ public void setSpillingAllowed(final boolean 
spillingAllowed)
   @Override
   public CloseableIterator<Entry<KeyType>> iterator(final boolean sorted)
   {
+    // Flush any runs that did not reach MIN_SPILL_FILE_BYTES during the spill 
phase.
+    try {
+      flushPendingRunsToDisk();

Review Comment:
   Overhead breakdown of flushPendingRunsToDisk():                              
                                                                                
                      
     1. LZ4 decompress — fast (~GB/s)                                           
                                                                                
                        
     2. JSON parse — moderate (dominant cost)                                   
                                                                                
                        
     3. Merge-sort comparison — cheap (O(N log K), K = few pending runs)        
                                                                                
                        
     4. JSON serialize — moderate (dominant cost)                               
                                                                                
                        
     5. LZ4 compress + write — fast                                             
                                                                                
                        
                                                                                
                                                                                
                        
   Replacing mergeSorted with concat (step 3) saves very little — the JSON 
serde in steps 2+4 dominates.
   
   The other approach is to write each pending run's raw byte[] sequentially 
into one file (each is already a complete LZ4+JSON stream). At read time, 
create one iterator per sub-stream. The catch with this approach is that 
LZ4BlockInputStream stops at each stream boundary, so reading N streams from 
one file requires creating N LZ4BlockInputStream instances on the same 
underlying FileInputStream. LZ4BlockInputStream allocates a single 
decompression buffer (default 64KB, matching LZ4BlockOutputStream's default 
block size). With a lot of spills (the scenario is are trying to fix with large 
aggregators + high cardinality group bys), these LZ4BlockInputStream will adds 
up resulting in OOM like before. 
   
   The merge-sort serde cost in flushPendingRunsToDisk() is the price we pay 
for keeping both file count and read-time memory bounded. And as noted earlier, 
replacing mergeSorted with concat alone saves very little since JSON serde 
dominates the cost.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to