jtuglu1 commented on code in PR #19357:
URL: https://github.com/apache/druid/pull/19357#discussion_r3192193001


##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java:
##########


Review Comment:
   Now that we are properly tracking bytes during deletion, I wonder if we 
should also look at deleting `dictionaryFiles` as well here.



##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java:
##########
@@ -72,6 +74,17 @@ public class SpillingGrouper<KeyType> implements 
Grouper<KeyType>
       "Maximum number of spill files reached for this query. Try raising 
druid.query.groupBy.maxSpillFileCount."
   );
 
+  /**
+   * Minimum number of serialized bytes that must accumulate across pending 
in-memory spill runs before they are
+   * flushed as a single file to disk. Aggregators like ThetaSketch 
pre-allocate a large fixed buffer per row
+   * (e.g. ~131KB for ThetaSketch(K=16384)), causing the in-memory grouper to 
flush frequently. However, when
+   * each key has been seen only a few times, the sketch serializes to just a 
handful of bytes in compact form.
+   * Without batching, this produces thousands of tiny spill files. By 
accumulating runs in heap memory first
+   * and writing to disk only once this threshold is reached, we avoid that 
explosion in file count without any
+   * extra disk I/O for small spills.
+   */
+  private static final long MIN_SPILL_FILE_BYTES = 1024 * 1024L; // 1MB

Review Comment:
   let's make this a config. sketch sizes can vary with the columns they cover 
and the k-values.



##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.java:
##########
@@ -121,13 +122,15 @@ public void delete(final File file)
   {
     synchronized (files) {
       if (files.contains(file)) {
+        final long fileSize = file.length();
         try {
           Files.delete(file.toPath());
         }
         catch (IOException e) {
           log.warn(e, "Cannot delete file: %s", file);
         }
         files.remove(file);
+        bytesUsed.addAndGet(-fileSize);

Review Comment:
   if this delete fails, I'd rather not have our accounting also be inaccurate 
– can we put this in the try after the delete?



##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java:
##########
@@ -293,6 +320,22 @@ public void setSpillingAllowed(final boolean 
spillingAllowed)
   @Override
   public CloseableIterator<Entry<KeyType>> iterator(final boolean sorted)
   {
+    // Flush any runs that did not reach MIN_SPILL_FILE_BYTES during the spill 
phase.
+    try {
+      flushPendingRunsToDisk();

Review Comment:
   we should see what the overhead of `sorted=false` is here. If we don't need 
a sorted run as the end result, we can just do a simple concat to avoid the 
decompress/re-sort overhead of merge sort. I think we might need to condition 
this on `sortHasNonGroupingFields=false` too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to