tarun11Mavani commented on code in PR #18760:
URL: https://github.com/apache/pinot/pull/18760#discussion_r3466398850
##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapResultExtractionStrategy.java:
##########
@@ -42,14 +49,32 @@ public List<RoaringBitmap>
extractIntermediateResult(DictIdsWrapper dictIdsWrapp
}
return result;
}
- Dictionary dictionary = dictIdsWrapper._dictionary;
List<RoaringBitmap> result = new ArrayList<>(_numSteps);
- for (RoaringBitmap dictIdBitmap : dictIdsWrapper._stepsBitmaps) {
- result.add(convertToValueBitmap(dictionary, dictIdBitmap));
+ if (dictIdsWrapper.isMultiKey()) {
+ for (RoaringBitmap compositeIdBitmap : dictIdsWrapper._stepsBitmaps) {
+ result.add(convertCompositeToValueBitmap(dictIdsWrapper,
compositeIdBitmap));
+ }
+ } else {
+ Dictionary dictionary = dictIdsWrapper._dictionaries[0];
+ for (RoaringBitmap dictIdBitmap : dictIdsWrapper._stepsBitmaps) {
+ result.add(convertToValueBitmap(dictionary, dictIdBitmap));
+ }
}
return result;
}
+ private RoaringBitmap convertCompositeToValueBitmap(DictIdsWrapper wrapper,
RoaringBitmap compositeIdBitmap) {
+ RoaringBitmap valueBitmap = new RoaringBitmap();
+ PeekableIntIterator iterator = compositeIdBitmap.getIntIterator();
+ int numKeys = wrapper._dictionaries.length;
+ int[] dictIds = new int[numKeys];
+ while (iterator.hasNext()) {
+ wrapper.reverseCompositeId(iterator.next(), dictIds);
+ valueBitmap.add(DictIdsWrapper.toCompositeString(wrapper._dictionaries,
dictIds).hashCode());
Review Comment:
Make sense. I'll replace the toCompositeString().hashCode() with a direct
hash-combining loop over the dictionary values.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/DictIdsWrapper.java:
##########
@@ -18,19 +18,151 @@
*/
package org.apache.pinot.core.query.aggregation.function.funnel;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.roaringbitmap.RoaringBitmap;
+/**
+ * Holds per-step RoaringBitmaps keyed by correlation dictionary IDs.
+ *
+ * <p>For single-key CORRELATE_BY, stores raw dictionary IDs directly in the
bitmaps (compact, fits in one
+ * RoaringBitmap container for typical segment sizes).
+ *
+ * <p>For multi-key CORRELATE_BY, composite IDs are assigned via stride-based
arithmetic (when the combined key
+ * space fits in int) or a HashMap fallback for large key spaces.
+ */
final class DictIdsWrapper {
- final Dictionary _dictionary;
+ final Dictionary[] _dictionaries;
final RoaringBitmap[] _stepsBitmaps;
+ // Stride-based composite mapping (non-null only for multi-key when product
of dict sizes fits in int)
+ private final int[] _strides;
+
+ // HashMap-based composite mapping (non-null only for multi-key when stride
overflows int)
+ private final Map<IntArrayList, Integer> _compositeKeyMap;
+ private final List<int[]> _compositeKeyReverse;
+ private final IntArrayList _lookupKey;
+
DictIdsWrapper(int numSteps, Dictionary dictionary) {
- _dictionary = dictionary;
+ _dictionaries = new Dictionary[]{dictionary};
_stepsBitmaps = new RoaringBitmap[numSteps];
for (int n = 0; n < numSteps; n++) {
_stepsBitmaps[n] = new RoaringBitmap();
}
+ _strides = null;
+ _compositeKeyMap = null;
+ _compositeKeyReverse = null;
+ _lookupKey = null;
+ }
+
+ DictIdsWrapper(int numSteps, Dictionary[] dictionaries) {
+ _dictionaries = dictionaries;
+ _stepsBitmaps = new RoaringBitmap[numSteps];
+ for (int n = 0; n < numSteps; n++) {
+ _stepsBitmaps[n] = new RoaringBitmap();
+ }
+
+ if (dictionaries.length > 1) {
+ long totalSpace = 1;
+ boolean fitsInInt = true;
+ for (Dictionary d : dictionaries) {
+ totalSpace *= d.length();
+ if (totalSpace > Integer.MAX_VALUE) {
+ fitsInInt = false;
+ break;
+ }
+ }
+
+ if (fitsInInt) {
+ _strides = new int[dictionaries.length];
+ _strides[dictionaries.length - 1] = 1;
+ for (int k = dictionaries.length - 2; k >= 0; k--) {
+ _strides[k] = _strides[k + 1] * dictionaries[k + 1].length();
+ }
+ _compositeKeyMap = null;
+ _compositeKeyReverse = null;
+ _lookupKey = null;
+ } else {
+ _strides = null;
+ _compositeKeyMap = new HashMap<>();
+ _compositeKeyReverse = new ArrayList<>();
+ _lookupKey = new IntArrayList(dictionaries.length);
+ }
+ } else {
+ _strides = null;
+ _compositeKeyMap = null;
+ _compositeKeyReverse = null;
+ _lookupKey = null;
+ }
+ }
+
+ boolean isMultiKey() {
+ return _dictionaries.length > 1;
+ }
+
+ boolean isHashMapPath() {
+ return _compositeKeyMap != null;
+ }
+
+ /**
+ * Maps a tuple of per-column dictionary IDs to a single composite int
suitable for RoaringBitmap.
+ * Only used for multi-key; for single-key, callers should add the dictId
directly.
+ */
+ int getCompositeCorrelationId(int[] dictIds) {
+ if (_strides != null) {
+ int id = 0;
+ for (int k = 0; k < dictIds.length; k++) {
+ id += dictIds[k] * _strides[k];
+ }
+ return id;
+ }
+ _lookupKey.clear();
+ for (int dictId : dictIds) {
+ _lookupKey.add(dictId);
+ }
+ Integer existingId = _compositeKeyMap.get(_lookupKey);
+ if (existingId != null) {
+ return existingId;
+ }
+ IntArrayList insertKey = new IntArrayList(dictIds);
Review Comment:
agreed.
If we adopt the approach from above comment, this entire HashMap path goes
away. If we keep it, I can at least eliminate the double-store (IntArrayList +
dictIds.clone()). Will address based on how we resolve comment 2.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationResult.java:
##########
@@ -50,7 +78,50 @@ public void add(int step, int correlationId) {
_correlatedSteps[step] = true;
}
+ /**
+ * Multi-key add. Data must be sorted by correlationIds[0] (primary key).
+ * Secondary keys are tracked via HashMap within each primary-key group.
+ *
+ * <p>Within a primary-key group, rows for the same (primary, secondary)
combination may appear
+ * non-contiguously (e.g. interleaved with other secondary keys). This is
handled correctly
+ * because the HashMap accumulates all step observations per secondary key
regardless of row order.
+ */
+ public void addMultiKey(int step, int[] correlationIds) {
+ int primaryId = correlationIds[0];
+ if (primaryId != _lastPrimaryId) {
+ flushMultiKeyGroup();
+ _lastPrimaryId = primaryId;
+ _secondaryKeySteps.clear();
+ }
+
+ _lookupKey.clear();
+ for (int k = 1; k < correlationIds.length; k++) {
+ _lookupKey.add(correlationIds[k]);
+ }
+
+ boolean[] steps = _secondaryKeySteps.get(_lookupKey);
+ if (steps == null) {
+ steps = new boolean[_numSteps];
+ _secondaryKeySteps.put(new IntArrayList(_lookupKey), steps);
Review Comment:
The clone is needed because _lookupKey is a reusable buffer that gets
clear()+add() on every row. Without the clone, the HashMap key mutates in-place
on the next row.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationResult.java:
##########
@@ -50,7 +78,50 @@ public void add(int step, int correlationId) {
_correlatedSteps[step] = true;
}
+ /**
+ * Multi-key add. Data must be sorted by correlationIds[0] (primary key).
+ * Secondary keys are tracked via HashMap within each primary-key group.
+ *
+ * <p>Within a primary-key group, rows for the same (primary, secondary)
combination may appear
+ * non-contiguously (e.g. interleaved with other secondary keys). This is
handled correctly
+ * because the HashMap accumulates all step observations per secondary key
regardless of row order.
+ */
+ public void addMultiKey(int step, int[] correlationIds) {
+ int primaryId = correlationIds[0];
+ if (primaryId != _lastPrimaryId) {
+ flushMultiKeyGroup();
+ _lastPrimaryId = primaryId;
+ _secondaryKeySteps.clear();
+ }
+
+ _lookupKey.clear();
Review Comment:
`correlationIds` includes the primary key at index 0 (we only need secondary
keys from index 1+), and `wrap()` aliases the underlying array which gets
mutated on the next row
##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/DictIdsWrapper.java:
##########
@@ -18,19 +18,151 @@
*/
package org.apache.pinot.core.query.aggregation.function.funnel;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.roaringbitmap.RoaringBitmap;
+/**
+ * Holds per-step RoaringBitmaps keyed by correlation dictionary IDs.
+ *
+ * <p>For single-key CORRELATE_BY, stores raw dictionary IDs directly in the
bitmaps (compact, fits in one
+ * RoaringBitmap container for typical segment sizes).
+ *
+ * <p>For multi-key CORRELATE_BY, composite IDs are assigned via stride-based
arithmetic (when the combined key
+ * space fits in int) or a HashMap fallback for large key spaces.
+ */
final class DictIdsWrapper {
- final Dictionary _dictionary;
+ final Dictionary[] _dictionaries;
final RoaringBitmap[] _stepsBitmaps;
+ // Stride-based composite mapping (non-null only for multi-key when product
of dict sizes fits in int)
+ private final int[] _strides;
+
+ // HashMap-based composite mapping (non-null only for multi-key when stride
overflows int)
+ private final Map<IntArrayList, Integer> _compositeKeyMap;
+ private final List<int[]> _compositeKeyReverse;
+ private final IntArrayList _lookupKey;
+
DictIdsWrapper(int numSteps, Dictionary dictionary) {
- _dictionary = dictionary;
+ _dictionaries = new Dictionary[]{dictionary};
_stepsBitmaps = new RoaringBitmap[numSteps];
for (int n = 0; n < numSteps; n++) {
_stepsBitmaps[n] = new RoaringBitmap();
}
+ _strides = null;
+ _compositeKeyMap = null;
+ _compositeKeyReverse = null;
+ _lookupKey = null;
+ }
+
+ DictIdsWrapper(int numSteps, Dictionary[] dictionaries) {
+ _dictionaries = dictionaries;
+ _stepsBitmaps = new RoaringBitmap[numSteps];
+ for (int n = 0; n < numSteps; n++) {
+ _stepsBitmaps[n] = new RoaringBitmap();
+ }
+
+ if (dictionaries.length > 1) {
+ long totalSpace = 1;
+ boolean fitsInInt = true;
+ for (Dictionary d : dictionaries) {
+ totalSpace *= d.length();
+ if (totalSpace > Integer.MAX_VALUE) {
+ fitsInInt = false;
+ break;
+ }
+ }
+
+ if (fitsInInt) {
+ _strides = new int[dictionaries.length];
+ _strides[dictionaries.length - 1] = 1;
+ for (int k = dictionaries.length - 2; k >= 0; k--) {
+ _strides[k] = _strides[k + 1] * dictionaries[k + 1].length();
+ }
+ _compositeKeyMap = null;
+ _compositeKeyReverse = null;
+ _lookupKey = null;
+ } else {
+ _strides = null;
+ _compositeKeyMap = new HashMap<>();
+ _compositeKeyReverse = new ArrayList<>();
+ _lookupKey = new IntArrayList(dictionaries.length);
+ }
+ } else {
+ _strides = null;
+ _compositeKeyMap = null;
+ _compositeKeyReverse = null;
+ _lookupKey = null;
+ }
+ }
+
+ boolean isMultiKey() {
+ return _dictionaries.length > 1;
+ }
+
+ boolean isHashMapPath() {
+ return _compositeKeyMap != null;
+ }
+
+ /**
+ * Maps a tuple of per-column dictionary IDs to a single composite int
suitable for RoaringBitmap.
+ * Only used for multi-key; for single-key, callers should add the dictId
directly.
+ */
+ int getCompositeCorrelationId(int[] dictIds) {
Review Comment:
To make sure I understand: with `((long) primaryDictId << 32) |
hash32(secondaryDictIds)`, the result is a 64-bit value, but RoaringBitmap only
stores 32-bit ints. Are you suggesting we:
(a) use Roaring64Bitmap instead,
(b) hash/cast the long down to 32 bits for the bitmap,
or (c) something else?
Also — this makes the set strategy and partitioned-bitmap strategy
approximate for multi-key (they're exact today since composite IDs are
collision-free). We need to agree on this and document it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]