This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 56a10f50e5e fix: Optimizes SpillingGrouper for high cardinality
dimension(s) GroupBy with large memory footprint aggregators (#19357)
56a10f50e5e is described below
commit 56a10f50e5eeee5dc577692a64706bdda7c7e26a
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Thu May 7 10:58:56 2026 -0700
fix: Optimizes SpillingGrouper for high cardinality dimension(s) GroupBy
with large memory footprint aggregators (#19357)
---
docs/configuration/index.md | 2 +
docs/querying/groupbyquery.md | 2 +
.../druid/query/groupby/GroupByQueryConfig.java | 22 +
.../groupby/epinephelinae/ConcurrentGrouper.java | 10 +-
.../epinephelinae/LimitedTemporaryStorage.java | 15 +-
.../epinephelinae/RowBasedGrouperHelper.java | 1 +
.../groupby/epinephelinae/SpillingGrouper.java | 194 +++++++--
.../epinephelinae/ConcurrentGrouperTest.java | 2 +
.../groupby/epinephelinae/SpillingGrouperTest.java | 458 +++++++++++++++++++++
9 files changed, 672 insertions(+), 34 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 8f4e4a7dcbf..4036ab7fb50 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -2231,6 +2231,7 @@ Supported runtime properties:
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space
(approximately) to use for per-query string dictionaries. When the dictionary
exceeds this size, a spill to disk will be triggered. See [groupBy memory
tuning and resource
limits](../querying/groupbyquery.md#memory-tuning-and-resource-limits) for
details.|100000000|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use,
per-query, for spilling result sets to disk when either the merging buffer or
the dictionary fills up. Queries that exceed this limit will fail. Set to zero
to disable disk spilling.|0 (disabled)|
|`druid.query.groupBy.maxSpillFileCount`|Maximum number of spill files allowed
per GroupBy query. Queries that exceed this limit will fail. See [groupBy
memory tuning and resource
limits](../querying/groupbyquery.md#memory-tuning-and-resource-limits) for
details.|Integer.MAX_VALUE (unlimited)|
+|`druid.query.groupBy.minSpillFileSize`|Minimum number of bytes that must
accumulate across pending in-memory spill runs before they are flushed as a
single file to disk. Smaller spills are batched in heap memory to avoid
creating many tiny files. Higher values reduce file count but increase heap
usage.|1048576 (1 MiB)|
|`druid.query.groupBy.defaultOnDiskStorage`|Default amount of disk space to
use, per-query, for spilling the result sets to disk when either the merging
buffer or the dictionary fills up. Set to zero to disable disk spilling for
queries which don't override `maxOnDiskStorage` in their
context.|`druid.query.groupBy.maxOnDiskStorage`|
Supported query contexts:
@@ -2241,6 +2242,7 @@ Supported query contexts:
|`maxMergingDictionarySize`|Can be used to lower the value of
`druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxOnDiskStorage`|Can be used to set `maxOnDiskStorage` to a value between 0
and `druid.query.groupBy.maxOnDiskStorage` for this query. If this query
context override exceeds `druid.query.groupBy.maxOnDiskStorage`, the query will
use `druid.query.groupBy.maxOnDiskStorage`. Omitting this from the query
context will cause the query to use `druid.query.groupBy.defaultOnDiskStorage`
for `maxOnDiskStorage`|
|`maxSpillFileCount`|Can be used to override the value of
`druid.query.groupBy.maxSpillFileCount` for this query.|
+|`minSpillFileSize`|Can be used to override the value of
`druid.query.groupBy.minSpillFileSize` for this query.|
### Advanced configurations
diff --git a/docs/querying/groupbyquery.md b/docs/querying/groupbyquery.md
index cf8aea8f505..be4ff0462a5 100644
--- a/docs/querying/groupbyquery.md
+++ b/docs/querying/groupbyquery.md
@@ -358,6 +358,7 @@ Supported runtime properties:
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space
(approximately) to use for per-query string dictionaries. When the dictionary
exceeds this size, a spill to disk will be triggered. If set to `0`
(automatic), each query's dictionary uses 30% of the Java heap divided by
`druid.processing.numMergeBuffers`, or 1GB, whichever is smaller.<br /><br
/>See [Memory tuning and resource limits](#memory-tuning-and-resource-limits)
for details on changing this property.|0 ( [...]
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use,
per-query, for spilling result sets to disk when either the merging buffer or
the dictionary fills up. Queries that exceed this limit will fail. Set to zero
to disable disk spilling.|0 (disabled)|
|`druid.query.groupBy.maxSpillFileCount`|Maximum number of spill files allowed
per GroupBy query. Queries that exceed this limit will fail.<br /><br />See
[Memory tuning and resource limits](#memory-tuning-and-resource-limits) for
details on changing this property.|Integer.MAX_VALUE (unlimited)|
+|`druid.query.groupBy.minSpillFileSize`|Minimum number of bytes that must
accumulate across pending in-memory spill runs before they are flushed as a
single file to disk. Smaller spills are batched in heap memory to avoid
creating many tiny files. Higher values reduce file count but increase heap
usage.|1048576 (1 MiB)|
Supported query contexts:
@@ -365,6 +366,7 @@ Supported query contexts:
|---|-----------|
|`maxOnDiskStorage`|Can be used to lower the value of
`druid.query.groupBy.maxOnDiskStorage` for this query.|
|`maxSpillFileCount`|Can be used to override the value of
`druid.query.groupBy.maxSpillFileCount` for this query.|
+|`minSpillFileSize`|Can be used to override the value of
`druid.query.groupBy.minSpillFileSize` for this query.|
### Advanced configurations
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
index c1336bc11c2..19270bb52f8 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
@@ -54,6 +54,7 @@ public class GroupByQueryConfig
private static final String CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE =
"maxSelectorDictionarySize";
private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE =
"maxMergingDictionarySize";
private static final String CTX_KEY_MAX_SPILL_FILE_COUNT =
"maxSpillFileCount";
+ private static final String CTX_KEY_MIN_SPILL_FILE_SIZE = "minSpillFileSize";
private static final String CTX_KEY_FORCE_HASH_AGGREGATION =
"forceHashAggregation";
private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE =
"intermediateCombineDegree";
private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS =
"numParallelCombineThreads";
@@ -104,6 +105,16 @@ public class GroupByQueryConfig
// Max on-disk temporary storage, per-query; when exceeded, the query fails
private HumanReadableBytes maxOnDiskStorage = HumanReadableBytes.valueOf(0);
+ @JsonProperty
+ // Minimum number of serialized bytes that must accumulate across pending
in-memory spill runs before they are
+ // flushed as a single file to disk. Aggregators like ThetaSketch
pre-allocate a large fixed buffer per row
+ // (e.g. ~131KB for ThetaSketch(K=16384)), causing the in-memory grouper to
flush frequently. However, when
+ // each key has been seen only a few times, the sketch serializes to just a
handful of bytes in compact form.
+ // Without batching, this produces thousands of tiny spill files. By
accumulating runs in heap memory first
+ // and writing to disk only once this threshold is reached, we avoid that
explosion in file count without any
+ // extra disk I/O for small spills.
+ private HumanReadableBytes minSpillFileSize =
HumanReadableBytes.valueOf(1024 * 1024L);
+
@JsonProperty
private HumanReadableBytes defaultOnDiskStorage =
HumanReadableBytes.valueOf(-1);
@@ -251,6 +262,11 @@ public class GroupByQueryConfig
return maxSpillFileCount;
}
+ public long getMinSpillFileSize()
+ {
+ return minSpillFileSize.getBytes();
+ }
+
/**
* Mirror maxOnDiskStorage if defaultOnDiskStorage's default is not
overridden by cluster operator.
*
@@ -357,6 +373,11 @@ public class GroupByQueryConfig
getMaxSpillFileCount()
);
+ newConfig.minSpillFileSize = queryContext.getHumanReadableBytes(
+ CTX_KEY_MIN_SPILL_FILE_SIZE,
+ getMinSpillFileSize()
+ );
+
newConfig.forcePushDownLimit =
queryContext.getBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit());
newConfig.applyLimitPushDownToSegment = queryContext.getBoolean(
CTX_KEY_APPLY_LIMIT_PUSH_DOWN_TO_SEGMENT,
@@ -400,6 +421,7 @@ public class GroupByQueryConfig
", bufferGrouperInitialBuckets=" + bufferGrouperInitialBuckets +
", maxMergingDictionarySize=" + maxMergingDictionarySize +
", maxOnDiskStorage=" + maxOnDiskStorage.getBytes() +
+ ", minSpillFileSize=" + minSpillFileSize.getBytes() +
", defaultOnDiskStorage=" + getDefaultOnDiskStorage().getBytes() +
// use the getter because of special behavior for mirroring maxOnDiskStorage if
defaultOnDiskStorage not explicitly set.
", forcePushDownLimit=" + forcePushDownLimit +
", forceHashAggregation=" + forceHashAggregation +
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
index b4b4cb34701..8b9a342e4e2 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -95,6 +96,7 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
@Nullable
private final ParallelCombiner<KeyType> parallelCombiner;
private final boolean mergeThreadLocal;
+ private final long minSpillFileSize;
private final GroupByStatsProvider.PerQueryStats perQueryStats;
private volatile boolean initialized = false;
@@ -141,6 +143,7 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
groupByQueryConfig.getIntermediateCombineDegree(),
groupByQueryConfig.getNumParallelCombineThreads(),
groupByQueryConfig.isMergeThreadLocal(),
+ groupByQueryConfig.getMinSpillFileSize(),
perQueryStats
);
}
@@ -167,6 +170,7 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
final int intermediateCombineDegree,
final int numParallelCombineThreads,
final boolean mergeThreadLocal,
+ final long minSpillFileSize,
final GroupByStatsProvider.PerQueryStats perQueryStats
)
{
@@ -217,6 +221,7 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
}
this.mergeThreadLocal = mergeThreadLocal;
+ this.minSpillFileSize = minSpillFileSize;
this.perQueryStats = perQueryStats;
}
@@ -245,6 +250,7 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
limitSpec,
sortHasNonGroupingFields,
sliceSize,
+ minSpillFileSize,
perQueryStats
);
grouper.init();
@@ -452,7 +458,9 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);
- throw new RuntimeException(e.getCause());
+ Throwable cause = e.getCause();
+ Throwables.throwIfUnchecked(cause);
+ throw new RuntimeException(cause);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.java
index 1e71abc06bc..b8b3a6001d5 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.java
@@ -56,6 +56,7 @@ public class LimitedTemporaryStorage implements Closeable
private final AtomicLong bytesUsed = new AtomicLong();
private final Set<File> files = new TreeSet<>();
+ private int nextFileIndex = 0;
private volatile boolean closed = false;
@@ -105,7 +106,7 @@ public class LimitedTemporaryStorage implements Closeable
createdStorageDirectory = true;
}
- final File theFile = new File(storageDirectory,
StringUtils.format("%08d.tmp", files.size()));
+ final File theFile = new File(storageDirectory,
StringUtils.format("%08d.tmp", nextFileIndex++));
final EnumSet<StandardOpenOption> openOptions = EnumSet.of(
StandardOpenOption.CREATE_NEW,
StandardOpenOption.WRITE
@@ -121,8 +122,10 @@ public class LimitedTemporaryStorage implements Closeable
{
synchronized (files) {
if (files.contains(file)) {
+ final long fileSize = file.length();
try {
Files.delete(file.toPath());
+ bytesUsed.addAndGet(-fileSize);
}
catch (IOException e) {
log.warn(e, "Cannot delete file: %s", file);
@@ -143,6 +146,14 @@ public class LimitedTemporaryStorage implements Closeable
return bytesUsed.get();
}
+ @VisibleForTesting
+ public int currentFileCount()
+ {
+ synchronized (files) {
+ return files.size();
+ }
+ }
+
@Override
public void close()
{
@@ -152,8 +163,6 @@ public class LimitedTemporaryStorage implements Closeable
}
closed = true;
- perQueryStatsContainer.spilledBytes(bytesUsed.get());
-
bytesUsed.set(0);
for (File file : ImmutableSet.copyOf(files)) {
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
index 7b7e4015fc1..97301cafa53 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
@@ -282,6 +282,7 @@ public class RowBasedGrouperHelper
limitSpec,
sortHasNonGroupingFields,
mergeBufferSize,
+ querySpecificConfig.getMinSpillFileSize(),
perQueryStats
);
} else {
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
index 904a7ef8864..7af71f92673 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
@@ -35,12 +35,14 @@ import
org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -80,11 +82,20 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
private final Comparator<Grouper.Entry<KeyType>> keyObjComparator;
private final Comparator<Grouper.Entry<KeyType>>
defaultOrderKeyObjComparator;
private final GroupByStatsProvider.PerQueryStats perQueryStats;
+ private final long minSpillFileSize;
private final List<File> files = new ArrayList<>();
private final List<File> dictionaryFiles = new ArrayList<>();
private final boolean sortHasNonGroupingFields;
+ // Pending spill runs not yet written to disk. Each entry is one buffer
flush serialized as a
+ // LZ4-compressed JSON byte array — the same format as an on-disk spill
file, so it can be
+ // re-read with the same read() path. Runs are held in heap memory and
merged into a single
+ // sorted file only when pendingSpillBytes reaches minSpillFileSize.
+ private final List<byte[]> pendingSpillRuns = new ArrayList<>();
+ private final Set<String> pendingDictionaryEntries = new HashSet<>();
+ private long pendingSpillBytes = 0;
+
private boolean diskFull = false;
private boolean maxFileCount = false;
private boolean spillingAllowed;
@@ -103,6 +114,7 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
final DefaultLimitSpec limitSpec,
final boolean sortHasNonGroupingFields,
final int mergeBufferSize,
+ final long minSpillFileSize,
final GroupByStatsProvider.PerQueryStats perQueryStats
)
{
@@ -163,6 +175,7 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
this.spillMapper = keySerde.decorateObjectMapper(spillMapper);
this.spillingAllowed = spillingAllowed;
this.sortHasNonGroupingFields = sortHasNonGroupingFields;
+ this.minSpillFileSize = minSpillFileSize;
this.perQueryStats = perQueryStats;
}
@@ -225,6 +238,9 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
public void reset()
{
grouper.reset();
+ pendingSpillRuns.clear();
+ pendingSpillBytes = 0;
+ pendingDictionaryEntries.clear();
deleteFiles();
}
@@ -233,8 +249,20 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
{
perQueryStats.dictionarySize(getDictionarySizeEstimate());
perQueryStats.maxMergeBufferUsedBytes(getMaxMergeBufferUsedBytes());
+ // Record spilled bytes before deleteFiles() decrements bytesUsed in
temporaryStorage.
+ long spilledBytes = 0;
+ for (final File file : files) {
+ spilledBytes += file.length();
+ }
+ for (final File file : dictionaryFiles) {
+ spilledBytes += file.length();
+ }
+ perQueryStats.spilledBytes(spilledBytes);
grouper.close();
keySerde.reset();
+ pendingSpillRuns.clear();
+ pendingSpillBytes = 0;
+ pendingDictionaryEntries.clear();
deleteFiles();
}
@@ -290,9 +318,36 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
this.spillingAllowed = spillingAllowed;
}
+ /**
+ * Returns an iterator over all grouped entries, merging results from the
in-memory grouper and
+ * any spill files on disk. When sorted is true, uses a merge-sorted
iterator across all sources;
+ * when false, simply concatenates them.
+ *
+ * <p>In practice, sorted is always true. {@link RowBasedGrouperHelper}
hardcodes
+ * {@code grouper.iterator(true)} because the merge layer above —
CombiningIterator in
+ * {@link ConcurrentGrouper} and the broker merge — detects duplicate keys
by comparing
+ * consecutive sorted entries. So sorted=true is required for merge
correctness, not output
+ * ordering. The sorted=false path exists but is unreachable through any
production query path.
+ */
@Override
public CloseableIterator<Entry<KeyType>> iterator(final boolean sorted)
{
+ // Flush any runs that did not reach minSpillFileSize during the spill
phase.
+ try {
+ flushPendingRunsToDisk();
+ }
+ catch (TemporaryStorageFullException e) {
+ diskFull = true;
+ throw new ResourceLimitExceededException(DISK_FULL.getReason());
+ }
+ catch (TemporaryStorageFileLimitException e) {
+ maxFileCount = true;
+ throw new ResourceLimitExceededException(MAX_FILE.getReason());
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
final List<CloseableIterator<Entry<KeyType>>> iterators = new
ArrayList<>(1 + files.size());
iterators.add(grouper.iterator(sorted));
@@ -301,33 +356,7 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
for (final File file : files) {
final MappingIterator<Entry<KeyType>> fileIterator = read(file,
keySerde.keyClazz());
- iterators.add(
- CloseableIterators.withEmptyBaggage(
- Iterators.transform(
- fileIterator,
- new Function<>()
- {
- final ReusableEntry<KeyType> reusableEntry =
- ReusableEntry.create(keySerde,
aggregatorFactories.length);
-
- @Override
- public Entry<KeyType> apply(Entry<KeyType> entry)
- {
- final Object[] deserializedValues =
reusableEntry.getValues();
- for (int i = 0; i < deserializedValues.length; i++) {
- deserializedValues[i] =
aggregatorFactories[i].deserialize(entry.getValues()[i]);
- if (deserializedValues[i] instanceof Integer) {
- // Hack to satisfy the groupBy unit tests; perhaps
we could do better by adjusting Jackson config.
- deserializedValues[i] = ((Integer)
deserializedValues[i]).longValue();
- }
- }
- reusableEntry.setKey(entry.getKey());
- return reusableEntry;
- }
- }
- )
- )
- );
+ iterators.add(deserializeIterator(fileIterator));
closer.register(fileIterator);
}
@@ -345,14 +374,115 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
private void spill() throws IOException
{
+ // Stream directly to a temp file first, then check the file size. If the
file is small
+ // (serialized size much smaller than the pre-allocated buffer, e.g. HLL
sketches in List mode),
+ // read it back into memory for batching to avoid creating thousands of
tiny disk files.
+ // If the file is already large enough, keep it on disk as-is.
+ final File file;
try (CloseableIterator<Entry<KeyType>> iterator = grouper.iterator(true)) {
- files.add(spill(iterator));
- dictionaryFiles.add(spill(keySerde.getDictionary().iterator()));
+ file = spill(iterator);
+ }
+ pendingDictionaryEntries.addAll(keySerde.getDictionary());
+ grouper.reset();
+
+ final long fileSize = file.length();
+ if (fileSize < minSpillFileSize) {
+ pendingSpillRuns.add(Files.readAllBytes(file.toPath()));
+ pendingSpillBytes += fileSize;
+ temporaryStorage.delete(file);
+
+ if (pendingSpillBytes >= minSpillFileSize) {
+ flushPendingRunsToDisk();
+ }
+ } else {
+ files.add(file);
+ dictionaryFiles.add(spill(pendingDictionaryEntries.iterator()));
+ pendingDictionaryEntries.clear();
+ }
+ }
- grouper.reset();
+ /**
+ * Merge-sorts all pending in-memory spill runs and writes them as a single
sorted file to disk.
+ * Each run is already individually sorted (from grouper.iterator(true));
this method merges them
+ * so the output file is fully sorted, as required by iterator()'s
mergeSorted across files.
+ * <p>
+ * We always merge-sort rather than concatenating runs (regardless of sorted
/ sortHasNonGroupingFields flags).
+ * The processing cost is dominated by JSON deserialization and
re-serialization; the merge-sort comparison itself
+ * is O(N log K) key comparisons and negligible relative to the serde
overhead, so concatenation would save little.
+ * <p>
+ * An alternative approach of writing each pending run's raw byte[]
sequentially into one file
+ * (avoiding serde entirely) was rejected because at read time each
sub-stream would require its own
+ * LZ4BlockInputStream with an internal buffer. With large amount of small
spills we can end up with large number of
+ * sub-streams, each with its own buffer, which can lead to OOM. By merging
runs together, we ensure that the number
+ * of spill files (and thus sub-streams) is small regardless of spill
pattern.
+ */
+ private void flushPendingRunsToDisk() throws IOException
+ {
+ if (pendingSpillRuns.isEmpty()) {
+ return;
+ }
+
+ final Comparator<Entry<KeyType>> sortComparator =
+ sortHasNonGroupingFields ? defaultOrderKeyObjComparator :
keyObjComparator;
+
+ final List<MappingIterator<Entry<KeyType>>> readers = new
ArrayList<>(pendingSpillRuns.size());
+ try {
+ for (final byte[] runBytes : pendingSpillRuns) {
+ readers.add(spillMapper.readValues(
+ spillMapper.getFactory().createParser(new LZ4BlockInputStream(new
ByteArrayInputStream(runBytes))),
+
spillMapper.getTypeFactory().constructParametricType(ReusableEntry.class,
keySerde.keyClazz())
+ ));
+ }
+ final List<CloseableIterator<Entry<KeyType>>> iterators = new
ArrayList<>(readers.size());
+ for (final MappingIterator<Entry<KeyType>> reader : readers) {
+ iterators.add(deserializeIterator(reader));
+ }
+ files.add(spill(CloseableIterators.mergeSorted(iterators,
sortComparator)));
+ dictionaryFiles.add(spill(pendingDictionaryEntries.iterator()));
+ }
+ finally {
+ for (final MappingIterator<Entry<KeyType>> reader : readers) {
+ try {
+ reader.close();
+ }
+ catch (IOException e) {
+ log.warn(e, "Failed to close reader while flushing pending spill
runs");
+ }
+ }
+ pendingSpillRuns.clear();
+ pendingSpillBytes = 0;
+ pendingDictionaryEntries.clear();
}
}
+ private CloseableIterator<Entry<KeyType>> deserializeIterator(final
Iterator<Entry<KeyType>> iterator)
+ {
+ return CloseableIterators.withEmptyBaggage(
+ Iterators.transform(
+ iterator,
+ new Function<>()
+ {
+ final ReusableEntry<KeyType> reusableEntry =
+ ReusableEntry.create(keySerde, aggregatorFactories.length);
+
+ @Override
+ public Entry<KeyType> apply(Entry<KeyType> entry)
+ {
+ final Object[] deserializedValues = reusableEntry.getValues();
+ for (int i = 0; i < deserializedValues.length; i++) {
+ deserializedValues[i] =
aggregatorFactories[i].deserialize(entry.getValues()[i]);
+ if (deserializedValues[i] instanceof Integer) {
+ deserializedValues[i] = ((Integer)
deserializedValues[i]).longValue();
+ }
+ }
+ reusableEntry.setKey(entry.getKey());
+ return reusableEntry;
+ }
+ }
+ )
+ );
+ }
+
private <T> File spill(Iterator<T> iterator) throws IOException
{
try (
@@ -390,5 +520,9 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
temporaryStorage.delete(file);
}
files.clear();
+ for (final File file : dictionaryFiles) {
+ temporaryStorage.delete(file);
+ }
+ dictionaryFiles.clear();
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
index bb41f92f782..0b793f429b4 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
@@ -179,6 +179,7 @@ public class ConcurrentGrouperTest extends
InitializedNullHandlingTest
4,
parallelCombineThreads,
mergeThreadLocal,
+ 1024 * 1024L,
perQueryStats
);
closer.register(grouper);
@@ -257,6 +258,7 @@ public class ConcurrentGrouperTest extends
InitializedNullHandlingTest
4,
parallelCombineThreads,
mergeThreadLocal,
+ 1024 * 1024L,
perQueryStats
);
closer.register(grouper);
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouperTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouperTest.java
new file mode 100644
index 00000000000..0fa77f988ad
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouperTest.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.groupby.GroupByStatsProvider;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SpillingGrouperTest extends InitializedNullHandlingTest
+{
+ private static final AggregatorFactory[] AGGREGATOR_FACTORIES = new
AggregatorFactory[]{
+ new LongSumAggregatorFactory("valueSum", "value"),
+ new CountAggregatorFactory("count")
+ };
+ private static final int KEY_SIZE = new IntKeySerde().keySize();
+ private static final float MAX_LOAD_FACTOR = 0.75f;
+ private static final int INITIAL_BUCKETS = 4;
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ public void testNoSpilling() throws IOException
+ {
+ final File storageDir = temporaryFolder.newFolder();
+ // Only 3 keys with a 10,000-byte buffer. Everything fits in memory
+ try (SpillingGrouper<IntKey> grouper = makeGrouper(10000, storageDir, 1024
* 1024, 100)) {
+ for (int i = 0; i < 3; i++) {
+ Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+ }
+
+ assertResultsCorrect(grouper, 3, 1);
+ Assert.assertEquals(0, storageDir.listFiles().length);
+ }
+ }
+
+ @Test
+ public void testSpillAndIterateSorted() throws IOException
+ {
+ final File storageDir = temporaryFolder.newFolder();
+ final int numKeys = 100;
+ // 100 unique keys force many spills since buffer is only 50 bytes. With
iterator(true), results should be sorted ascending by key.
+ try (SpillingGrouper<IntKey> grouper = makeGrouper(50, storageDir, 1024 *
1024, 100)) {
+ for (int i = 0; i < numKeys; i++) {
+ Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+ }
+
+ try (CloseableIterator<Grouper.Entry<IntKey>> iterator =
grouper.iterator(true)) {
+ Assert.assertTrue("spilling should have occurred",
storageDir.listFiles().length > 0);
+ int prevKey = -1;
+ int count = 0;
+ while (iterator.hasNext()) {
+ Grouper.Entry<IntKey> entry = iterator.next();
+ Assert.assertTrue(
+ "keys should be sorted ascending",
+ entry.getKey().intValue() > prevKey
+ );
+ prevKey = entry.getKey().intValue();
+ Assert.assertEquals(1L, entry.getValues()[0]);
+ Assert.assertEquals(1L, entry.getValues()[1]);
+ count++;
+ }
+ Assert.assertEquals(numKeys, count);
+ }
+ }
+ }
+
+ @Test
+ public void testSpillAndIterateUnsorted() throws IOException
+ {
+ final File storageDir = temporaryFolder.newFolder();
+ final int numKeys = 100;
+ // 100 unique keys force many spills since buffer is only 50 bytes. With
iterator(false), results may be in any order, but all keys should be present
with correct values.
+ try (SpillingGrouper<IntKey> grouper = makeGrouper(50, storageDir, 1024 *
1024, 100)) {
+ for (int i = 0; i < numKeys; i++) {
+ Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+ }
+
+ assertResultsCorrect(grouper, numKeys, 1);
+ Assert.assertTrue("spilling should have occurred",
storageDir.listFiles().length > 0);
+ }
+ }
+
+ @Test
+ public void testAggregatesDuplicateKeys() throws IOException
+ {
+ // SpillingGrouper doesn't combine across spills — duplicate keys from
different spill files
+ // appear as separate entries in the sorted iterator. Verify that the
total aggregate values
+ // per key sum to the expected amount across all entries.
+ final File storageDir = temporaryFolder.newFolder();
+ final int numKeys = 20;
+ final int duplicates = 5;
+ try (SpillingGrouper<IntKey> grouper = makeGrouper(50, storageDir, 1024 *
1024, 100)) {
+ for (int round = 0; round < duplicates; round++) {
+ for (int i = 0; i < numKeys; i++) {
+ Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+ }
+ }
+
+ int totalEntries = 0;
+ final Map<Integer, Long> totalCounts = new HashMap<>();
+ try (CloseableIterator<Grouper.Entry<IntKey>> iterator =
grouper.iterator(true)) {
+ Assert.assertTrue("spilling should have occurred",
storageDir.listFiles().length > 0);
+ while (iterator.hasNext()) {
+ Grouper.Entry<IntKey> entry = iterator.next();
+ totalCounts.merge(entry.getKey().intValue(), (Long)
entry.getValues()[1], Long::sum);
+ totalEntries++;
+ }
+ }
+ Assert.assertTrue(
+ "duplicate keys should exist across spills, so total entries (" +
totalEntries
+ + ") should exceed unique key count (" + numKeys + ")",
+ totalEntries > numKeys
+ );
+ Assert.assertEquals(numKeys, totalCounts.size());
+ for (Map.Entry<Integer, Long> e : totalCounts.entrySet()) {
+ Assert.assertEquals(
+ "total count for key " + e.getKey(),
+ (long) duplicates,
+ (long) e.getValue()
+ );
+ }
+ }
+ }
+
+ @Test
+ public void testSmallSpillsAreBatched() throws IOException
+ {
+ final File storageDir = temporaryFolder.newFolder();
+ final int bufferSize = 50;
+ final int numKeys = 100;
+
+ int maxUsableEntries = computeMaxUsableEntries(bufferSize);
+ Assert.assertEquals(
+ "buffer should hold at most 1 entry, guaranteeing a spill on every
key",
+ 1,
+ maxUsableEntries
+ );
+
+ try (SpillingGrouper<IntKey> grouper = makeGrouper(bufferSize, storageDir,
1024 * 1024, 100)) {
+ for (int i = 0; i < numKeys; i++) {
+ Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+ }
+
+ assertResultsCorrect(grouper, numKeys, 1);
+
+ File[] files = storageDir.listFiles();
+ Assert.assertNotNull(files);
+ Assert.assertEquals(
+ "all spills are tiny and should batch into a single data +
dictionary file pair",
+ 2,
+ files.length
+ );
+ }
+ }
+
+ @Test
+ public void testDiskQuotaReclaimedWhenSmallSpillsDeleted() throws IOException
+ {
+ final File storageDir = temporaryFolder.newFolder();
+ final LimitedTemporaryStorage temporaryStorage =
+ new LimitedTemporaryStorage(storageDir, 1024 * 1024, 100, new
GroupByStatsProvider.PerQueryStats());
+ final int bufferSize = 50;
+ final int numKeys = 100;
+
+ int maxUsableEntries = computeMaxUsableEntries(bufferSize);
+ Assert.assertEquals(
+ "buffer should hold at most 1 entry, guaranteeing a spill on every
key",
+ 1,
+ maxUsableEntries
+ );
+
+ try (SpillingGrouper<IntKey> grouper = makeGrouper(bufferSize,
temporaryStorage)) {
+ for (int i = 0; i < numKeys; i++) {
+ Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+ }
+
+ // Before iterator(): small spills were created and deleted during
batching, so the
+ // temporary storage should have reclaimed their bytes. Only the final
merged file(s)
+ // from flushPendingRunsToDisk() should remain on disk.
+ long sizeBeforeIterator = temporaryStorage.currentSize();
+ int fileCountBeforeIterator = temporaryStorage.currentFileCount();
+
+ // With a 50-byte buffer and 100 keys, many individual spills occur.
Batching deletes
+ // each small temp file immediately, so the file count should be much
less than numKeys.
+ Assert.assertTrue(
+ "file count (" + fileCountBeforeIterator + ") should be much less
than numKeys (" + numKeys
+ + ") because small spill files are deleted after being read into
memory",
+ fileCountBeforeIterator < numKeys
+ );
+
+ // The tracked bytes should reflect only the files still on disk, not
the deleted ones.
+ long actualDiskBytes = 0;
+ File[] diskFiles = storageDir.listFiles();
+ Assert.assertNotNull(diskFiles);
+ for (File f : diskFiles) {
+ actualDiskBytes += f.length();
+ }
+ Assert.assertEquals(
+ "tracked bytes should match actual bytes on disk",
+ actualDiskBytes,
+ sizeBeforeIterator
+ );
+
+ // Calling iterator() flushes remaining pending runs; verify results are
still correct.
+ assertResultsCorrect(grouper, numKeys, 1);
+
+ // After iterator, check that the final state is also consistent.
+ long sizeAfterIterator = temporaryStorage.currentSize();
+ long actualDiskBytesAfter = 0;
+ File[] diskFilesAfter = storageDir.listFiles();
+ Assert.assertNotNull(diskFilesAfter);
+ for (File f : diskFilesAfter) {
+ actualDiskBytesAfter += f.length();
+ }
+ Assert.assertEquals(
+ "tracked bytes should match actual bytes on disk after iterator",
+ actualDiskBytesAfter,
+ sizeAfterIterator
+ );
+ }
+ }
+
+ @Test
+ public void testResetClearsPendingState() throws IOException
+ {
+ try (SpillingGrouper<IntKey> grouper = makeGrouper(50,
temporaryFolder.newFolder(), 1024 * 1024, 100)) {
+ for (int i = 0; i < 50; i++) {
+ Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+ }
+
+ grouper.reset();
+
+ for (int i = 1000; i < 1010; i++) {
+ Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+ }
+
+ try (CloseableIterator<Grouper.Entry<IntKey>> iterator =
grouper.iterator(true)) {
+ int count = 0;
+ while (iterator.hasNext()) {
+ Grouper.Entry<IntKey> entry = iterator.next();
+ Assert.assertTrue(
+ "keys should be >= 1000 after reset",
+ entry.getKey().intValue() >= 1000
+ );
+ count++;
+ }
+ Assert.assertEquals(10, count);
+ }
+ }
+ }
+
+ @Test
+ public void testDiskFull() throws IOException
+ {
+ try (SpillingGrouper<IntKey> grouper = makeGrouper(50,
temporaryFolder.newFolder(), 10, 100)) {
+ AggregateResult lastResult = AggregateResult.ok();
+ for (int i = 0; i < 10000 && lastResult.isOk(); i++) {
+ lastResult = grouper.aggregate(new IntKey(i));
+ }
+
+ Assert.assertFalse("should have hit disk full", lastResult.isOk());
+ Assert.assertTrue(
+ "reason should mention disk space",
+ lastResult.getReason().contains("Not enough disk space")
+ );
+ }
+ }
+
+ @Test
+ public void testMaxSpillFileCount() throws IOException
+ {
+ // With batching, small spill files are created and immediately deleted,
so the file count
+ // stays low. The limit is hit when flushPendingRunsToDisk() creates the
merged data file
+ // (succeeds as file #1) then tries to create the dictionary file (fails
because
+ // maxFileCount=1 is already reached). Need enough keys to accumulate >=
1MB of pending bytes.
+ //
+ // Without batching, the file limit would be hit on the 2nd spill — only a
handful of keys
+ // would succeed. With batching, thousands of keys are processed before
the flush triggers
+ // the limit. We assert keysAggregated > maxUsableEntries * 2 to prove
batching was active.
+ final int bufferSize = 50;
+ final int maxUsableEntries = computeMaxUsableEntries(bufferSize);
+ try (SpillingGrouper<IntKey> grouper = makeGrouper(bufferSize,
temporaryFolder.newFolder(), 10 * 1024 * 1024, 1)) {
+ AggregateResult lastResult = AggregateResult.ok();
+ int keysAggregated = 0;
+ for (int i = 0; i < 200_000 && lastResult.isOk(); i++) {
+ lastResult = grouper.aggregate(new IntKey(i));
+ if (lastResult.isOk()) {
+ keysAggregated++;
+ }
+ }
+
+ Assert.assertFalse("should have hit max file count", lastResult.isOk());
+ Assert.assertTrue(
+ "reason should mention spill file count",
+ lastResult.getReason().contains("Maximum number of spill files")
+ );
+ Assert.assertTrue(
+ "batching should allow many keys (" + keysAggregated + ") before
hitting file limit;"
+ + " without batching only ~" + (maxUsableEntries * 2) + " would
succeed",
+ keysAggregated > maxUsableEntries * 2
+ );
+ }
+ }
+
+ private SpillingGrouper<IntKey> makeGrouper(
+ int bufferSize,
+ File storageDir,
+ long maxStorageBytes,
+ int maxFileCount
+ )
+ {
+ return makeGrouper(
+ bufferSize,
+ new LimitedTemporaryStorage(storageDir, maxStorageBytes, maxFileCount,
new GroupByStatsProvider.PerQueryStats())
+ );
+ }
+
+ private SpillingGrouper<IntKey> makeGrouper(
+ int bufferSize,
+ LimitedTemporaryStorage temporaryStorage
+ )
+ {
+ final GroupByTestColumnSelectorFactory columnSelectorFactory =
GrouperTestUtil.newColumnSelectorFactory();
+ columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value",
1L)));
+
+ final SpillingGrouper<IntKey> grouper = new SpillingGrouper<>(
+ Suppliers.ofInstance(ByteBuffer.allocate(bufferSize)),
+ new IntKeySerdeFactory(),
+ columnSelectorFactory,
+ AGGREGATOR_FACTORIES,
+ Integer.MAX_VALUE,
+ MAX_LOAD_FACTOR,
+ INITIAL_BUCKETS,
+ temporaryStorage,
+ new ObjectMapper(),
+ true,
+ null,
+ false,
+ bufferSize,
+ 1024 * 1024L,
+ new GroupByStatsProvider.PerQueryStats()
+ );
+ grouper.init();
+ return grouper;
+ }
+
+ private void assertResultsCorrect(
+ SpillingGrouper<IntKey> grouper,
+ int expectedKeys,
+ long expectedCountPerKey
+ ) throws IOException
+ {
+ final Map<Integer, Object[]> results = new HashMap<>();
+ try (CloseableIterator<Grouper.Entry<IntKey>> iterator =
grouper.iterator(true)) {
+ while (iterator.hasNext()) {
+ Grouper.Entry<IntKey> entry = iterator.next();
+ int key = entry.getKey().intValue();
+ Object[] valuesCopy = new Object[entry.getValues().length];
+ System.arraycopy(entry.getValues(), 0, valuesCopy, 0,
valuesCopy.length);
+ Assert.assertNull("duplicate key in results: " + key, results.put(key,
valuesCopy));
+ }
+ }
+ Assert.assertEquals(expectedKeys, results.size());
+ for (Map.Entry<Integer, Object[]> e : results.entrySet()) {
+ Assert.assertEquals(
+ "valueSum for key " + e.getKey(),
+ expectedCountPerKey,
+ e.getValue()[0]
+ );
+ Assert.assertEquals(
+ "count for key " + e.getKey(),
+ expectedCountPerKey,
+ e.getValue()[1]
+ );
+ }
+ }
+
+ private static int computeMaxUsableEntries(int bufferSize)
+ {
+ int aggSize = 0;
+ for (AggregatorFactory factory : AGGREGATOR_FACTORIES) {
+ aggSize += factory.getMaxIntermediateSizeWithNulls();
+ }
+ int bucketSizeWithHash = Integer.BYTES + KEY_SIZE + aggSize;
+ int maxBuckets = Math.min(bufferSize / bucketSizeWithHash,
INITIAL_BUCKETS);
+ return (int) (maxBuckets * MAX_LOAD_FACTOR);
+ }
+
+ static class IntKeySerdeFactory implements Grouper.KeySerdeFactory<IntKey>
+ {
+ @Override
+ public long getMaxDictionarySize()
+ {
+ return 0;
+ }
+
+ @Override
+ public Grouper.KeySerde<IntKey> factorize()
+ {
+ return new IntKeySerde();
+ }
+
+ @Override
+ public Grouper.KeySerde<IntKey> factorizeWithDictionary(List<String>
dictionary)
+ {
+ return factorize();
+ }
+
+ @Override
+ public IntKey copyKey(IntKey key)
+ {
+ return new IntKey(key.intValue());
+ }
+
+ @Override
+ public Comparator<Grouper.Entry<IntKey>> objectComparator(boolean
forceDefaultOrder)
+ {
+ return Comparator.comparingInt(o -> o.getKey().intValue());
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]