jtuglu1 commented on code in PR #19357:
URL: https://github.com/apache/druid/pull/19357#discussion_r3192789245


##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java:
##########
@@ -345,12 +354,103 @@ public Entry<KeyType> apply(Entry<KeyType> entry)
 
   private void spill() throws IOException
   {
+    // Stream directly to a temp file first, then check the file size. If the 
file is small
+    // (serialized size much smaller than the pre-allocated buffer, e.g. HLL 
sketches in List mode),
+    // read it back into memory for batching to avoid creating thousands of 
tiny disk files.
+    // If the file is already large enough, keep it on disk as-is.
+    final File file;
     try (CloseableIterator<Entry<KeyType>> iterator = grouper.iterator(true)) {
-      files.add(spill(iterator));
-      dictionaryFiles.add(spill(keySerde.getDictionary().iterator()));
+      file = spill(iterator);
+    }
+    pendingDictionaryEntries.addAll(keySerde.getDictionary());
+    grouper.reset();
+
+    final long fileSize = file.length();
+    if (fileSize < minSpillFileSize) {
+      pendingSpillRuns.add(Files.readAllBytes(file.toPath()));
+      pendingSpillBytes += fileSize;
+      temporaryStorage.delete(file);
+
+      if (pendingSpillBytes >= minSpillFileSize) {
+        flushPendingRunsToDisk();
+      }
+    } else {
+      files.add(file);
+      dictionaryFiles.add(spill(pendingDictionaryEntries.iterator()));
+      pendingDictionaryEntries.clear();
+    }
+  }
+
+  /**
+   * Merge-sorts all pending in-memory spill runs and writes them as a single 
sorted file to disk.
+   * Each run is already individually sorted (from grouper.iterator(true)); 
this method merges them
+   * so the output file is fully sorted, as required by iterator()'s 
mergeSorted across files.
+   */
+  private void flushPendingRunsToDisk() throws IOException
+  {
+    if (pendingSpillRuns.isEmpty()) {
+      return;
+    }
+
+    final Comparator<Entry<KeyType>> sortComparator =
+        sortHasNonGroupingFields ? defaultOrderKeyObjComparator : 
keyObjComparator;
 
-      grouper.reset();
+    final List<MappingIterator<Entry<KeyType>>> readers = new 
ArrayList<>(pendingSpillRuns.size());
+    try {
+      for (final byte[] runBytes : pendingSpillRuns) {
+        readers.add(spillMapper.readValues(
+            spillMapper.getFactory().createParser(new LZ4BlockInputStream(new 
ByteArrayInputStream(runBytes))),
+            
spillMapper.getTypeFactory().constructParametricType(ReusableEntry.class, 
keySerde.keyClazz())
+        ));
+      }
+      final List<CloseableIterator<Entry<KeyType>>> iterators = new 
ArrayList<>(readers.size());
+      for (final MappingIterator<Entry<KeyType>> reader : readers) {
+        iterators.add(deserializeIterator(reader));
+      }
+      files.add(spill(CloseableIterators.mergeSorted(iterators, 
sortComparator)));
+      dictionaryFiles.add(spill(pendingDictionaryEntries.iterator()));
     }
+    finally {
+      for (final MappingIterator<Entry<KeyType>> reader : readers) {
+        try {
+          reader.close();
+        }
+        catch (IOException e) {
+          log.warn(e, "Failed to close reader while flushing pending spill 
runs");
+        }
+      }
+      pendingSpillRuns.clear();
+      pendingSpillBytes = 0;
+      pendingDictionaryEntries.clear();
+    }
+  }
+
+  private CloseableIterator<Entry<KeyType>> deserializeIterator(final 
Iterator<Entry<KeyType>> iterator)
+  {
+    return CloseableIterators.withEmptyBaggage(
+        Iterators.transform(
+            iterator,
+            new Function<>()
+            {
+              final ReusableEntry<KeyType> reusableEntry =
+                  ReusableEntry.create(keySerde, aggregatorFactories.length);
+
+              @Override
+              public Entry<KeyType> apply(Entry<KeyType> entry)
+              {
+                final Object[] deserializedValues = reusableEntry.getValues();
+                for (int i = 0; i < deserializedValues.length; i++) {
+                  deserializedValues[i] = 
aggregatorFactories[i].deserialize(entry.getValues()[i]);
+                  if (deserializedValues[i] instanceof Integer) {

Review Comment:
   Why are we coercing to long values here?



##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java:
##########
@@ -345,12 +354,103 @@ public Entry<KeyType> apply(Entry<KeyType> entry)
 
   private void spill() throws IOException
   {
+    // Stream directly to a temp file first, then check the file size. If the 
file is small
+    // (serialized size much smaller than the pre-allocated buffer, e.g. HLL 
sketches in List mode),
+    // read it back into memory for batching to avoid creating thousands of 
tiny disk files.
+    // If the file is already large enough, keep it on disk as-is.
+    final File file;
     try (CloseableIterator<Entry<KeyType>> iterator = grouper.iterator(true)) {
-      files.add(spill(iterator));
-      dictionaryFiles.add(spill(keySerde.getDictionary().iterator()));
+      file = spill(iterator);
+    }
+    pendingDictionaryEntries.addAll(keySerde.getDictionary());
+    grouper.reset();
+
+    final long fileSize = file.length();

Review Comment:
   I wonder if there might be an optimization here to reserve entire needed 
capacity for the array upfront to prevent it needing to double up to sufficient 
capacity. Since we only .clear() and not something like `trimToSize()` this 
resizing might be amortized across all the files, especially if they are of 
similar size.



##########
processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouperTest.java:
##########
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.groupby.GroupByStatsProvider;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SpillingGrouperTest extends InitializedNullHandlingTest
+{
+  private static final AggregatorFactory[] AGGREGATOR_FACTORIES = new 
AggregatorFactory[]{
+      new LongSumAggregatorFactory("valueSum", "value"),
+      new CountAggregatorFactory("count")
+  };
+  private static final int KEY_SIZE = new IntKeySerde().keySize();
+  private static final float MAX_LOAD_FACTOR = 0.75f;
+  private static final int INITIAL_BUCKETS = 4;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Test
+  public void testNoSpilling() throws IOException
+  {
+    final File storageDir = temporaryFolder.newFolder();
+    //  Only 3 keys with a 10,000-byte buffer. Everything fits in memory
+    try (SpillingGrouper<IntKey> grouper = makeGrouper(10000, storageDir, 1024 
* 1024, 100)) {
+      for (int i = 0; i < 3; i++) {
+        Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+      }
+
+      assertResultsCorrect(grouper, 3, 1);
+      Assert.assertEquals(0, storageDir.listFiles().length);
+    }
+  }
+
+  @Test
+  public void testSpillAndIterateSorted() throws IOException
+  {
+    final File storageDir = temporaryFolder.newFolder();
+    final int numKeys = 100;
+    // 100 unique keys force many spills since buffer is only 50 bytes. With 
iterator(true), results should be sorted ascending by key.
+    try (SpillingGrouper<IntKey> grouper = makeGrouper(50, storageDir, 1024 * 
1024, 100)) {
+      for (int i = 0; i < numKeys; i++) {
+        Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+      }
+
+      try (CloseableIterator<Grouper.Entry<IntKey>> iterator = 
grouper.iterator(true)) {
+        Assert.assertTrue("spilling should have occurred", 
storageDir.listFiles().length > 0);
+        int prevKey = -1;
+        int count = 0;
+        while (iterator.hasNext()) {
+          Grouper.Entry<IntKey> entry = iterator.next();
+          Assert.assertTrue(
+              "keys should be sorted ascending",
+              entry.getKey().intValue() > prevKey
+          );
+          prevKey = entry.getKey().intValue();
+          Assert.assertEquals(1L, entry.getValues()[0]);
+          Assert.assertEquals(1L, entry.getValues()[1]);
+          count++;
+        }
+        Assert.assertEquals(numKeys, count);
+      }
+    }
+  }
+
+  @Test
+  public void testSpillAndIterateUnsorted() throws IOException
+  {
+    final File storageDir = temporaryFolder.newFolder();
+    final int numKeys = 100;
+    // 100 unique keys force many spills since buffer is only 50 bytes. With 
iterator(false), results may be in any order, but all keys should be present 
with correct values.
+    try (SpillingGrouper<IntKey> grouper = makeGrouper(50, storageDir, 1024 * 
1024, 100)) {
+      for (int i = 0; i < numKeys; i++) {
+        Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+      }
+
+      assertResultsCorrect(grouper, numKeys, 1);
+      Assert.assertTrue("spilling should have occurred", 
storageDir.listFiles().length > 0);
+    }
+  }
+
+  @Test
+  public void testAggregatesDuplicateKeys() throws IOException
+  {
+    // SpillingGrouper doesn't combine across spills — duplicate keys from 
different spill files
+    // appear as separate entries in the sorted iterator. Verify that the 
total aggregate values
+    // per key sum to the expected amount across all entries.
+    final File storageDir = temporaryFolder.newFolder();
+    final int numKeys = 20;
+    final int duplicates = 5;
+    try (SpillingGrouper<IntKey> grouper = makeGrouper(50, storageDir, 1024 * 
1024, 100)) {
+      for (int round = 0; round < duplicates; round++) {
+        for (int i = 0; i < numKeys; i++) {
+          Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+        }
+      }
+
+      int totalEntries = 0;
+      final Map<Integer, Long> totalCounts = new HashMap<>();
+      try (CloseableIterator<Grouper.Entry<IntKey>> iterator = 
grouper.iterator(true)) {
+        Assert.assertTrue("spilling should have occurred", 
storageDir.listFiles().length > 0);
+        while (iterator.hasNext()) {
+          Grouper.Entry<IntKey> entry = iterator.next();
+          totalCounts.merge(entry.getKey().intValue(), (Long) 
entry.getValues()[1], Long::sum);
+          totalEntries++;
+        }
+      }
+      Assert.assertTrue(
+          "duplicate keys should exist across spills, so total entries (" + 
totalEntries
+          + ") should exceed unique key count (" + numKeys + ")",
+          totalEntries > numKeys
+      );
+      Assert.assertEquals(numKeys, totalCounts.size());
+      for (Map.Entry<Integer, Long> e : totalCounts.entrySet()) {
+        Assert.assertEquals(
+            "total count for key " + e.getKey(),
+            (long) duplicates,
+            (long) e.getValue()
+        );
+      }
+    }
+  }
+
+  @Test
+  public void testSmallSpillsAreBatched() throws IOException
+  {
+    final File storageDir = temporaryFolder.newFolder();
+    final int bufferSize = 50;
+    final int numKeys = 100;
+
+    int maxUsableEntries = computeMaxUsableEntries(bufferSize);
+    Assert.assertEquals(
+        "buffer should hold at most 1 entry, guaranteeing a spill on every 
key",
+        1,
+        maxUsableEntries
+    );
+
+    try (SpillingGrouper<IntKey> grouper = makeGrouper(bufferSize, storageDir, 
1024 * 1024, 100)) {
+      for (int i = 0; i < numKeys; i++) {
+        Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+      }
+
+      assertResultsCorrect(grouper, numKeys, 1);
+
+      File[] files = storageDir.listFiles();
+      Assert.assertNotNull(files);
+      Assert.assertEquals(
+          "all spills are tiny and should batch into a single data + 
dictionary file pair",
+          2,
+          files.length
+      );
+    }
+  }
+
+  @Test
+  public void testResetClearsPendingState() throws IOException
+  {
+    try (SpillingGrouper<IntKey> grouper = makeGrouper(50, 
temporaryFolder.newFolder(), 1024 * 1024, 100)) {
+      for (int i = 0; i < 50; i++) {
+        Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+      }
+
+      grouper.reset();
+
+      for (int i = 1000; i < 1010; i++) {
+        Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+      }
+
+      try (CloseableIterator<Grouper.Entry<IntKey>> iterator = 
grouper.iterator(true)) {
+        int count = 0;
+        while (iterator.hasNext()) {
+          Grouper.Entry<IntKey> entry = iterator.next();
+          Assert.assertTrue(
+              "keys should be >= 1000 after reset",
+              entry.getKey().intValue() >= 1000
+          );
+          count++;
+        }
+        Assert.assertEquals(10, count);
+      }
+    }
+  }
+
+  @Test
+  public void testDiskFull() throws IOException
+  {
+    try (SpillingGrouper<IntKey> grouper = makeGrouper(50, 
temporaryFolder.newFolder(), 10, 100)) {
+      AggregateResult lastResult = AggregateResult.ok();
+      for (int i = 0; i < 10000 && lastResult.isOk(); i++) {
+        lastResult = grouper.aggregate(new IntKey(i));
+      }
+
+      Assert.assertFalse("should have hit disk full", lastResult.isOk());
+      Assert.assertTrue(
+          "reason should mention disk space",
+          lastResult.getReason().contains("Not enough disk space")
+      );

Review Comment:
   Could we maybe add some asserts/another test which verify the statistics on 
file delete errors? Something that would catch regressions for the bug that was 
patched in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to