This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch exact-distinct-count in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit a7ab0fd56978ca4412f86d6c09d86e1d3cb35baf Author: kishoreg <g.kish...@gmail.com> AuthorDate: Sun Aug 16 02:28:55 2020 -0700 Support for exact distinct count for non int data types --- .../common/function/AggregationFunctionType.java | 3 +- .../apache/pinot/core/common/ObjectSerDeUtils.java | 176 +++++++++++++-- .../query/DictionaryBasedAggregationOperator.java | 24 +- .../function/DistinctCountAggregationFunction.java | 243 ++++++++++++++++----- .../DistinctCountMVAggregationFunction.java | 31 +-- .../DistinctRawBloomFilterAggregationFunction.java | 226 +++++++++++++++++++ 6 files changed, 609 insertions(+), 94 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java index fc60ea6..b0db043 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java @@ -61,7 +61,8 @@ public enum AggregationFunctionType { PERCENTILEMV("percentileMV"), PERCENTILEESTMV("percentileEstMV"), PERCENTILETDIGESTMV("percentileTDigestMV"), - DISTINCT("distinct"); + DISTINCT("distinct"), + DISTINCTRAWBLOOMFILTER("distinctRawBloomFilter"); private final String _name; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java index 9c87921..8995952 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java @@ -19,17 +19,31 @@ package org.apache.pinot.core.common; import com.clearspring.analytics.stream.cardinality.HyperLogLog; +import com.google.common.base.Charsets; import com.google.common.primitives.Longs; import com.tdunning.math.stats.MergingDigest; import com.tdunning.math.stats.TDigest; import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import it.unimi.dsi.fastutil.doubles.DoubleIterator; +import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet; +import it.unimi.dsi.fastutil.doubles.DoubleSet; +import it.unimi.dsi.fastutil.floats.FloatIterator; +import it.unimi.dsi.fastutil.floats.FloatOpenHashSet; +import it.unimi.dsi.fastutil.floats.FloatSet; import it.unimi.dsi.fastutil.ints.IntIterator; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.ints.IntSet; +import it.unimi.dsi.fastutil.longs.LongIterator; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongSet; +import it.unimi.dsi.fastutil.objects.ObjectIterator; +import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; +import it.unimi.dsi.fastutil.objects.ObjectSet; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -68,7 +82,11 @@ public class ObjectSerDeUtils { DistinctTable(11), DataSketch(12), Geometry(13), - RoaringBitmap(14); + RoaringBitmap(14), + LongSet(15), + FloatSet(16), + DoubleSet(17), + BytesSet(18); private final int _value; @@ -111,6 +129,14 @@ public class ObjectSerDeUtils { return ObjectType.Geometry; } else if (value instanceof RoaringBitmap) { return ObjectType.RoaringBitmap; + } else if (value instanceof LongSet) { + return ObjectType.LongSet; + } else if (value instanceof it.unimi.dsi.fastutil.floats.FloatSet) { + return ObjectType.FloatSet; + } else if (value instanceof it.unimi.dsi.fastutil.doubles.DoubleSet) { + return ObjectType.DoubleSet; + } else if (value instanceof ObjectSet) { + return ObjectType.BytesSet; } else { throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName()); } @@ -452,6 +478,135 @@ public class ObjectSerDeUtils { } }; + public static final ObjectSerDe<LongSet> LONG_SET_SER_DE = new ObjectSerDe<LongSet>() { + + @Override + public byte[] serialize(LongSet longSet) { + int size = longSet.size(); + byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + byteBuffer.putInt(size); + LongIterator iterator = longSet.iterator(); + while (iterator.hasNext()) { + byteBuffer.putLong(iterator.nextLong()); + } + return bytes; + } + + @Override + public LongSet deserialize(byte[] bytes) { + return deserialize(ByteBuffer.wrap(bytes)); + } + + @Override + public LongSet deserialize(ByteBuffer byteBuffer) { + int size = byteBuffer.getInt(); + LongSet longSet = new LongOpenHashSet(size); + for (int i = 0; i < size; i++) { + longSet.add(byteBuffer.getLong()); + } + return longSet; + } + }; + + public static final ObjectSerDe<FloatSet> FLOAT_SET_SER_DE = new ObjectSerDe<FloatSet>() { + + @Override + public byte[] serialize(FloatSet floatSet) { + int size = floatSet.size(); + byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + byteBuffer.putInt(size); + FloatIterator iterator = floatSet.iterator(); + while (iterator.hasNext()) { + byteBuffer.putFloat(iterator.nextFloat()); + } + return bytes; + } + + @Override + public FloatSet deserialize(byte[] bytes) { + return deserialize(ByteBuffer.wrap(bytes)); + } + + @Override + public FloatSet deserialize(ByteBuffer byteBuffer) { + int size = byteBuffer.getInt(); + FloatSet floatSet = new FloatOpenHashSet(size); + for (int i = 0; i < size; i++) { + floatSet.add(byteBuffer.getLong()); + } + return floatSet; + } + }; + + public static final ObjectSerDe<DoubleSet> DOUBLE_SET_SER_DE = new ObjectSerDe<DoubleSet>() { + + @Override + public byte[] serialize(DoubleSet doubleSet) { + int size = doubleSet.size(); + byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + byteBuffer.putInt(size); + DoubleIterator iterator = doubleSet.iterator(); + while (iterator.hasNext()) { + byteBuffer.putDouble(iterator.nextDouble()); + } + return bytes; + } + + @Override + public DoubleSet deserialize(byte[] bytes) { + return deserialize(ByteBuffer.wrap(bytes)); + } + + @Override + public DoubleSet deserialize(ByteBuffer byteBuffer) { + int size = byteBuffer.getInt(); + DoubleSet doubleSet = new DoubleOpenHashSet(size); + for (int i = 0; i < size; i++) { + doubleSet.add(byteBuffer.getDouble()); + } + return doubleSet; + } + }; + + public static final ObjectSerDe<ObjectSet<byte[]>> BYTES_SET_SER_DE = new ObjectSerDe<ObjectSet<byte[]>>() { + + @Override + public byte[] serialize(ObjectSet bytesSet) { + int size = bytesSet.size(); + byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + byteBuffer.putInt(size); + ObjectIterator<byte[]> iterator = bytesSet.iterator(); + while (iterator.hasNext()) { + byte[] val = iterator.next(); + byteBuffer.putInt(val.length); + byteBuffer.put(val); + } + return bytes; + } + + @Override + public ObjectSet<byte[]> deserialize(byte[] bytes) { + return deserialize(ByteBuffer.wrap(bytes)); + } + + @Override + public ObjectSet<byte[]> deserialize(ByteBuffer byteBuffer) { + int size = byteBuffer.getInt(); + ObjectOpenHashSet<byte[]> bytesSet = new ObjectOpenHashSet<>(size); + for (int i = 0; i < size; i++) { + int length = byteBuffer.getInt(); + byte[] val = new byte[length]; + byteBuffer.get(val); + bytesSet.add(val); + } + return bytesSet; + } + }; + public static final ObjectSerDe<TDigest> TDIGEST_SER_DE = new ObjectSerDe<TDigest>() { @Override @@ -538,23 +693,8 @@ public class ObjectSerDeUtils { // NOTE: DO NOT change the order, it has to be the same order as the ObjectType //@formatter:off - private static final ObjectSerDe[] SER_DES = { - STRING_SER_DE, - LONG_SER_DE, - DOUBLE_SER_DE, - DOUBLE_ARRAY_LIST_SER_DE, - AVG_PAIR_SER_DE, - MIN_MAX_RANGE_PAIR_SER_DE, - HYPER_LOG_LOG_SER_DE, - QUANTILE_DIGEST_SER_DE, - MAP_SER_DE, - INT_SET_SER_DE, - TDIGEST_SER_DE, - DISTINCT_TABLE_SER_DE, - DATA_SKETCH_SER_DE, - GEOMETRY_SER_DE, - ROARING_BITMAP_SER_DE - }; + private static final ObjectSerDe[] SER_DES = + {STRING_SER_DE, LONG_SER_DE, DOUBLE_SER_DE, DOUBLE_ARRAY_LIST_SER_DE, AVG_PAIR_SER_DE, MIN_MAX_RANGE_PAIR_SER_DE, HYPER_LOG_LOG_SER_DE, QUANTILE_DIGEST_SER_DE, MAP_SER_DE, INT_SET_SER_DE, TDIGEST_SER_DE, DISTINCT_TABLE_SER_DE, DATA_SKETCH_SER_DE, GEOMETRY_SER_DE, ROARING_BITMAP_SER_DE, LONG_SET_SER_DE, FLOAT_SET_SER_DE, DOUBLE_SET_SER_DE, BYTES_SET_SER_DE}; //@formatter:on public static byte[] serialize(Object value) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java index 7fa6798..b83a202 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java @@ -18,7 +18,13 @@ */ package org.apache.pinot.core.operator.query; +import com.google.common.base.Charsets; +import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet; +import it.unimi.dsi.fastutil.floats.FloatOpenHashSet; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; +import java.util.AbstractCollection; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -77,36 +83,42 @@ public class DictionaryBasedAggregationOperator extends BaseOperator<Intermediat .add(new MinMaxRangePair(dictionary.getDoubleValue(0), dictionary.getDoubleValue(dictionarySize - 1))); break; case DISTINCTCOUNT: - IntOpenHashSet set = new IntOpenHashSet(dictionarySize); + AbstractCollection set; switch (dictionary.getValueType()) { case INT: + set = new IntOpenHashSet(dictionarySize); for (int dictId = 0; dictId < dictionarySize; dictId++) { set.add(dictionary.getIntValue(dictId)); } break; case LONG: + set = new LongOpenHashSet(dictionarySize); for (int dictId = 0; dictId < dictionarySize; dictId++) { - set.add(Long.hashCode(dictionary.getLongValue(dictId))); + set.add(dictionary.getLongValue(dictId)); } break; case FLOAT: + set = new FloatOpenHashSet(dictionarySize); for (int dictId = 0; dictId < dictionarySize; dictId++) { - set.add(Float.hashCode(dictionary.getFloatValue(dictId))); + set.add(dictionary.getFloatValue(dictId)); } break; case DOUBLE: + set = new DoubleOpenHashSet(dictionarySize); for (int dictId = 0; dictId < dictionarySize; dictId++) { - set.add(Double.hashCode(dictionary.getDoubleValue(dictId))); + set.add(dictionary.getDoubleValue(dictId)); } break; case STRING: + set = new ObjectOpenHashSet<byte[]>(dictionarySize); for (int dictId = 0; dictId < dictionarySize; dictId++) { - set.add(dictionary.getStringValue(dictId).hashCode()); + set.add(dictionary.getStringValue(dictId).getBytes(Charsets.UTF_8)); } break; case BYTES: + set = new ObjectOpenHashSet<byte[]>(dictionarySize); for (int dictId = 0; dictId < dictionarySize; dictId++) { - set.add(Arrays.hashCode(dictionary.getBytesValue(dictId))); + set.add(dictionary.getBytesValue(dictId)); } break; default: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java index e8e7e97..59a812f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java @@ -18,8 +18,14 @@ */ package org.apache.pinot.core.query.aggregation.function; +import com.google.common.base.Charsets; +import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet; +import it.unimi.dsi.fastutil.floats.FloatOpenHashSet; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; -import java.util.Arrays; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; +import java.util.AbstractCollection; +import java.util.Iterator; import java.util.Map; import org.apache.pinot.common.function.AggregationFunctionType; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; @@ -35,7 +41,7 @@ import org.roaringbitmap.PeekableIntIterator; import org.roaringbitmap.RoaringBitmap; -public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<IntOpenHashSet, Integer> { +public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<AbstractCollection, Integer> { public DistinctCountAggregationFunction(ExpressionContext expression) { super(expression); @@ -69,9 +75,10 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation return; } - // For non-dictionary-encoded expression, store hash code of the values into the value set - IntOpenHashSet valueSet = getValueSet(aggregationResultHolder); + // For non-dictionary-encoded expression DataType valueType = blockValSet.getValueType(); + + AbstractCollection valueSet = getValueSet(aggregationResultHolder, valueType); switch (valueType) { case INT: int[] intValues = blockValSet.getIntValuesSV(); @@ -82,31 +89,31 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation case LONG: long[] longValues = blockValSet.getLongValuesSV(); for (int i = 0; i < length; i++) { - valueSet.add(Long.hashCode(longValues[i])); + valueSet.add(longValues[i]); } break; case FLOAT: float[] floatValues = blockValSet.getFloatValuesSV(); for (int i = 0; i < length; i++) { - valueSet.add(Float.hashCode(floatValues[i])); + valueSet.add(floatValues[i]); } break; case DOUBLE: double[] doubleValues = blockValSet.getDoubleValuesSV(); for (int i = 0; i < length; i++) { - valueSet.add(Double.hashCode(doubleValues[i])); + valueSet.add(doubleValues[i]); } break; case STRING: String[] stringValues = blockValSet.getStringValuesSV(); for (int i = 0; i < length; i++) { - valueSet.add(stringValues[i].hashCode()); + valueSet.add(stringValues[i].getBytes(Charsets.UTF_8)); } break; case BYTES: byte[][] bytesValues = blockValSet.getBytesValuesSV(); for (int i = 0; i < length; i++) { - valueSet.add(Arrays.hashCode(bytesValues[i])); + valueSet.add(bytesValues[i]); } break; default: @@ -135,37 +142,37 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation case INT: int[] intValues = blockValSet.getIntValuesSV(); for (int i = 0; i < length; i++) { - getValueSet(groupByResultHolder, groupKeyArray[i]).add(intValues[i]); + getValueSet(groupByResultHolder, groupKeyArray[i], valueType).add(intValues[i]); } break; case LONG: long[] longValues = blockValSet.getLongValuesSV(); for (int i = 0; i < length; i++) { - getValueSet(groupByResultHolder, groupKeyArray[i]).add(Long.hashCode(longValues[i])); + getValueSet(groupByResultHolder, groupKeyArray[i], valueType).add(longValues[i]); } break; case FLOAT: float[] floatValues = blockValSet.getFloatValuesSV(); for (int i = 0; i < length; i++) { - getValueSet(groupByResultHolder, groupKeyArray[i]).add(Float.hashCode(floatValues[i])); + getValueSet(groupByResultHolder, groupKeyArray[i], valueType).add(floatValues[i]); } break; case DOUBLE: double[] doubleValues = blockValSet.getDoubleValuesSV(); for (int i = 0; i < length; i++) { - getValueSet(groupByResultHolder, groupKeyArray[i]).add(Double.hashCode(doubleValues[i])); + getValueSet(groupByResultHolder, groupKeyArray[i], valueType).add(doubleValues[i]); } break; case STRING: String[] stringValues = blockValSet.getStringValuesSV(); for (int i = 0; i < length; i++) { - getValueSet(groupByResultHolder, groupKeyArray[i]).add(stringValues[i].hashCode()); + getValueSet(groupByResultHolder, groupKeyArray[i], valueType).add(stringValues[i].getBytes(Charsets.UTF_8)); } break; case BYTES: byte[][] bytesValues = blockValSet.getBytesValuesSV(); for (int i = 0; i < length; i++) { - getValueSet(groupByResultHolder, groupKeyArray[i]).add(Arrays.hashCode(bytesValues[i])); + getValueSet(groupByResultHolder, groupKeyArray[i], valueType).add(bytesValues[i]); } break; default: @@ -194,37 +201,37 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation case INT: int[] intValues = blockValSet.getIntValuesSV(); for (int i = 0; i < length; i++) { - setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], intValues[i]); + setValueForGroupKeys(groupByResultHolder, valueType, groupKeysArray[i], intValues[i]); } break; case LONG: long[] longValues = blockValSet.getLongValuesSV(); for (int i = 0; i < length; i++) { - setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Long.hashCode(longValues[i])); + setValueForGroupKeys(groupByResultHolder, valueType, groupKeysArray[i], (longValues[i])); } break; case FLOAT: float[] floatValues = blockValSet.getFloatValuesSV(); for (int i = 0; i < length; i++) { - setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Float.hashCode(floatValues[i])); + setValueForGroupKeys(groupByResultHolder, valueType, groupKeysArray[i], floatValues[i]); } break; case DOUBLE: double[] doubleValues = blockValSet.getDoubleValuesSV(); for (int i = 0; i < length; i++) { - setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Double.hashCode(doubleValues[i])); + setValueForGroupKeys(groupByResultHolder, valueType, groupKeysArray[i], doubleValues[i]); } break; case STRING: String[] stringValues = blockValSet.getStringValuesSV(); for (int i = 0; i < length; i++) { - setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], stringValues[i].hashCode()); + setValueForGroupKeys(groupByResultHolder, valueType, groupKeysArray[i], stringValues[i].getBytes(Charsets.UTF_8)); } break; case BYTES: byte[][] bytesValues = blockValSet.getBytesValuesSV(); for (int i = 0; i < length; i++) { - setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Arrays.hashCode(bytesValues[i])); + setValueForGroupKeys(groupByResultHolder, valueType, groupKeysArray[i], bytesValues[i]); } break; default: @@ -233,10 +240,10 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation } @Override - public IntOpenHashSet extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + public AbstractCollection extractAggregationResult(AggregationResultHolder aggregationResultHolder) { Object result = aggregationResultHolder.getResult(); if (result == null) { - return new IntOpenHashSet(); + return emptyCollection(); } if (result instanceof DictIdsWrapper) { @@ -244,15 +251,39 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation return convertToValueSet((DictIdsWrapper) result); } else { // For non-dictionary-encoded expression, directly return the value set - return (IntOpenHashSet) result; + return (AbstractCollection) result; } } + private AbstractCollection emptyCollection() { + return new AbstractCollection() { + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public Object next() { + return null; + } + }; + } + + @Override + public int size() { + return 0; + } + }; + } + @Override - public IntOpenHashSet extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + public AbstractCollection extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { Object result = groupByResultHolder.getResult(groupKey); if (result == null) { - return new IntOpenHashSet(); + return emptyCollection(); } if (result instanceof DictIdsWrapper) { @@ -260,14 +291,52 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation return convertToValueSet((DictIdsWrapper) result); } else { // For non-dictionary-encoded expression, directly return the value set - return (IntOpenHashSet) result; + return (AbstractCollection) result; } } @Override - public IntOpenHashSet merge(IntOpenHashSet intermediateResult1, IntOpenHashSet intermediateResult2) { - intermediateResult1.addAll(intermediateResult2); - return intermediateResult1; + public AbstractCollection merge(AbstractCollection intermediateResult1, AbstractCollection intermediateResult2) { + if (intermediateResult1.getClass().isAssignableFrom(intermediateResult2.getClass())) { + intermediateResult1.addAll(intermediateResult2); + return intermediateResult1; + } else { + //handle backwards compatibility, we used to use IntHashSet for all datatypes earlier + //so we try to convert other types into int using hashcode + //Note this code path is executed only while brokers and servers are getting upgraded. + //When both are on the same version, they will satisfy the intermediateResult1.getClass().isAssignableFrom(intermediateResult2.getClass() condition + IntOpenHashSet intOpenHashSet; + AbstractCollection toMerge; + if (intermediateResult1 instanceof IntOpenHashSet) { + intOpenHashSet = (IntOpenHashSet) intermediateResult1; + toMerge = intermediateResult2; + } else { + intOpenHashSet = (IntOpenHashSet) intermediateResult2; + toMerge = intermediateResult1; + } + if (toMerge instanceof LongOpenHashSet) { + LongOpenHashSet longOpenHashSet = (LongOpenHashSet) toMerge; + for (long e : longOpenHashSet) { + intOpenHashSet.add(Long.hashCode(e)); + } + } else if (toMerge instanceof FloatOpenHashSet) { + FloatOpenHashSet floatOpenHashSet = (FloatOpenHashSet) toMerge; + for (float e : floatOpenHashSet) { + intOpenHashSet.add(Float.hashCode(e)); + } + } else if (toMerge instanceof DoubleOpenHashSet) { + DoubleOpenHashSet doubleOpenHashSet = (DoubleOpenHashSet) toMerge; + for (double e : doubleOpenHashSet) { + intOpenHashSet.add(Double.hashCode(e)); + } + } else if (toMerge instanceof ObjectOpenHashSet) { + ObjectOpenHashSet objectOpenHashSet = (ObjectOpenHashSet) toMerge; + for (Object e : objectOpenHashSet) { + intOpenHashSet.add(e.hashCode()); + } + } + return intOpenHashSet; + } } @Override @@ -286,7 +355,7 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation } @Override - public Integer extractFinalResult(IntOpenHashSet intermediateResult) { + public Integer extractFinalResult(AbstractCollection intermediateResult) { return intermediateResult.size(); } @@ -306,15 +375,42 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation /** * Returns the value set from the result holder or creates a new one if it does not exist. */ - protected static IntOpenHashSet getValueSet(AggregationResultHolder aggregationResultHolder) { - IntOpenHashSet valueSet = aggregationResultHolder.getResult(); + protected static AbstractCollection getValueSet(AggregationResultHolder aggregationResultHolder, DataType valueType) { + AbstractCollection valueSet = aggregationResultHolder.getResult(); if (valueSet == null) { - valueSet = new IntOpenHashSet(); + valueSet = getAbstractCollection(valueType); aggregationResultHolder.setValue(valueSet); } return valueSet; } + private static AbstractCollection getAbstractCollection(DataType valueType) { + AbstractCollection valueSet; + switch (valueType) { + case INT: + valueSet = new IntOpenHashSet(); + break; + case LONG: + valueSet = new LongOpenHashSet(); + break; + case FLOAT: + valueSet = new FloatOpenHashSet(); + break; + case DOUBLE: + valueSet = new DoubleOpenHashSet(); + break; + case STRING: + valueSet = new ObjectOpenHashSet<byte[]>(); + break; + case BYTES: + valueSet = new ObjectOpenHashSet<byte[]>(); + break; + default: + throw new IllegalStateException("Illegal data type for DISTINCT_COUNT aggregation function: " + valueType); + } + return valueSet; + } + /** * Returns the dictionary id bitmap for the given group key or creates a new one if it does not exist. */ @@ -331,10 +427,11 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation /** * Returns the value set for the given group key or creates a new one if it does not exist. */ - protected static IntOpenHashSet getValueSet(GroupByResultHolder groupByResultHolder, int groupKey) { - IntOpenHashSet valueSet = groupByResultHolder.getResult(groupKey); + protected static AbstractCollection getValueSet(GroupByResultHolder groupByResultHolder, int groupKey, + DataType valueType) { + AbstractCollection valueSet = groupByResultHolder.getResult(groupKey); if (valueSet == null) { - valueSet = new IntOpenHashSet(); + valueSet = getAbstractCollection(valueType); groupByResultHolder.setValueForKey(groupKey, valueSet); } return valueSet; @@ -353,9 +450,38 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation /** * Helper method to set value for the given group keys into the result holder. */ - private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys, int value) { + private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, DataType valueType, int[] groupKeys, + int value) { + for (int groupKey : groupKeys) { + getValueSet(groupByResultHolder, groupKey, valueType).add(value); + } + } + + private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, DataType valueType, int[] groupKeys, + long value) { + for (int groupKey : groupKeys) { + getValueSet(groupByResultHolder, groupKey, valueType).add(value); + } + } + + private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, DataType valueType, int[] groupKeys, + float value) { + for (int groupKey : groupKeys) { + getValueSet(groupByResultHolder, groupKey, valueType).add(value); + } + } + + private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, DataType valueType, int[] groupKeys, + double value) { + for (int groupKey : groupKeys) { + getValueSet(groupByResultHolder, groupKey, valueType).add(value); + } + } + + private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, DataType valueType, int[] groupKeys, + byte[] value) { for (int groupKey : groupKeys) { - getValueSet(groupByResultHolder, groupKey).add(value); + getValueSet(groupByResultHolder, groupKey, valueType).add(value); } } @@ -363,47 +489,56 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation * Helper method to read dictionary and convert dictionary ids to hash code of the values for dictionary-encoded * expression. */ - private static IntOpenHashSet convertToValueSet(DictIdsWrapper dictIdsWrapper) { + private static AbstractCollection convertToValueSet(DictIdsWrapper dictIdsWrapper) { Dictionary dictionary = dictIdsWrapper._dictionary; RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap; - IntOpenHashSet valueSet = new IntOpenHashSet(dictIdBitmap.getCardinality()); PeekableIntIterator iterator = dictIdBitmap.getIntIterator(); DataType valueType = dictionary.getValueType(); switch (valueType) { case INT: + IntOpenHashSet intOpenHashSet = new IntOpenHashSet(dictIdBitmap.getCardinality()); while (iterator.hasNext()) { - valueSet.add(dictionary.getIntValue(iterator.next())); + intOpenHashSet.add(dictionary.getIntValue(iterator.next())); } - break; + return intOpenHashSet; case LONG: + LongOpenHashSet longOpenHashSet = new LongOpenHashSet(dictIdBitmap.getCardinality()); while (iterator.hasNext()) { - valueSet.add(Long.hashCode(dictionary.getLongValue(iterator.next()))); + longOpenHashSet.add(dictionary.getLongValue(iterator.next())); } - break; + return longOpenHashSet; case FLOAT: + FloatOpenHashSet floatOpenHashSet = new FloatOpenHashSet(dictIdBitmap.getCardinality()); while (iterator.hasNext()) { - valueSet.add(Float.hashCode(dictionary.getFloatValue(iterator.next()))); + floatOpenHashSet.add(dictionary.getFloatValue(iterator.next())); } - break; + return floatOpenHashSet; + case DOUBLE: + DoubleOpenHashSet doubleOpenHashSet = new DoubleOpenHashSet(dictIdBitmap.getCardinality()); while (iterator.hasNext()) { - valueSet.add(Double.hashCode(dictionary.getDoubleValue(iterator.next()))); + doubleOpenHashSet.add(dictionary.getDoubleValue(iterator.next())); } - break; + return doubleOpenHashSet; case STRING: + ObjectOpenHashSet<byte[]> stringObjectOpenHashSet = + new ObjectOpenHashSet<byte[]>(dictIdBitmap.getCardinality()); while (iterator.hasNext()) { - valueSet.add(dictionary.getStringValue(iterator.next()).hashCode()); + stringObjectOpenHashSet.add(dictionary.getStringValue(iterator.next()).getBytes(Charsets.UTF_8)); } - break; + return stringObjectOpenHashSet; + case BYTES: + ObjectOpenHashSet<byte[]> bytesObjectOpenHashSet = + new ObjectOpenHashSet<byte[]>(dictIdBitmap.getCardinality()); + while (iterator.hasNext()) { - valueSet.add(Arrays.hashCode(dictionary.getBytesValue(iterator.next()))); + bytesObjectOpenHashSet.add((dictionary.getBytesValue(iterator.next()))); } - break; + return bytesObjectOpenHashSet; default: throw new IllegalStateException("Illegal data type for DISTINCT_COUNT aggregation function: " + valueType); } - return valueSet; } private static final class DictIdsWrapper { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java index fb6b2e3..4c93181 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.query.aggregation.function; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import java.util.AbstractCollection; import java.util.Map; import org.apache.pinot.common.function.AggregationFunctionType; import org.apache.pinot.core.common.BlockValSet; @@ -58,8 +59,8 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation } // For non-dictionary-encoded expression, store hash code of the values into the value set - IntOpenHashSet valueSet = getValueSet(aggregationResultHolder); FieldSpec.DataType valueType = blockValSet.getValueType(); + AbstractCollection valueSet = getValueSet(aggregationResultHolder, valueType); switch (valueType) { case INT: int[][] intValues = blockValSet.getIntValuesMV(); @@ -126,7 +127,7 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation case INT: int[][] intValues = blockValSet.getIntValuesMV(); for (int i = 0; i < length; i++) { - IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKeyArray[i]); + AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKeyArray[i], valueType); for (int value : intValues[i]) { valueSet.add(value); } @@ -135,36 +136,36 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation case LONG: long[][] longValues = blockValSet.getLongValuesMV(); for (int i = 0; i < length; i++) { - IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKeyArray[i]); + AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKeyArray[i], valueType); for (long value : longValues[i]) { - valueSet.add(Long.hashCode(value)); + valueSet.add(value); } } break; case FLOAT: float[][] floatValues = blockValSet.getFloatValuesMV(); for (int i = 0; i < length; i++) { - IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKeyArray[i]); + AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKeyArray[i], valueType); for (float value : floatValues[i]) { - valueSet.add(Float.hashCode(value)); + valueSet.add(value); } } break; case DOUBLE: double[][] doubleValues = blockValSet.getDoubleValuesMV(); for (int i = 0; i < length; i++) { - IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKeyArray[i]); + AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKeyArray[i], valueType); for (double value : doubleValues[i]) { - valueSet.add(Double.hashCode(value)); + valueSet.add(value); } } break; case STRING: String[][] stringValues = blockValSet.getStringValuesMV(); for (int i = 0; i < length; i++) { - IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKeyArray[i]); + AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKeyArray[i], valueType); for (String value : stringValues[i]) { - valueSet.add(value.hashCode()); + valueSet.add(value); } } break; @@ -197,7 +198,7 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation int[][] intValues = blockValSet.getIntValuesMV(); for (int i = 0; i < length; i++) { for (int groupKey : groupKeysArray[i]) { - IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey); + AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKey, valueType); for (int value : intValues[i]) { valueSet.add(value); } @@ -208,7 +209,7 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation long[][] longValues = blockValSet.getLongValuesMV(); for (int i = 0; i < length; i++) { for (int groupKey : groupKeysArray[i]) { - IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey); + AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKey, valueType); for (long value : longValues[i]) { valueSet.add(Long.hashCode(value)); } @@ -219,7 +220,7 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation float[][] floatValues = blockValSet.getFloatValuesMV(); for (int i = 0; i < length; i++) { for (int groupKey : groupKeysArray[i]) { - IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey); + AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKey, valueType); for (float value : floatValues[i]) { valueSet.add(Float.hashCode(value)); } @@ -230,7 +231,7 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation double[][] doubleValues = blockValSet.getDoubleValuesMV(); for (int i = 0; i < length; i++) { for (int groupKey : groupKeysArray[i]) { - IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey); + AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKey, valueType); for (double value : doubleValues[i]) { valueSet.add(Double.hashCode(value)); } @@ -241,7 +242,7 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation String[][] stringValues = blockValSet.getStringValuesMV(); for (int i = 0; i < length; i++) { for (int groupKey : groupKeysArray[i]) { - IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey); + AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKey, valueType); for (String value : stringValues[i]) { valueSet.add(value.hashCode()); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctRawBloomFilterAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctRawBloomFilterAggregationFunction.java new file mode 100644 index 0000000..72e7edc --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctRawBloomFilterAggregationFunction.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.function.AggregationFunctionType; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.RowBasedBlockValueFetcher; +import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; +import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.OrderByExpressionContext; + + +/** + * The DISTINCT clause in SQL is executed as the DISTINCT aggregation function. + * TODO: Support group-by + */ +@SuppressWarnings("rawtypes") +public class DistinctAggregationFunction implements AggregationFunction<DistinctTable, Comparable> { + private final List<ExpressionContext> _expressions; + private final String[] _columns; + private final List<OrderByExpressionContext> _orderByExpressions; + private final int _limit; + + /** + * Constructor for the class. + * + * @param expressions Distinct columns to return + * @param orderByExpressions Order By clause + * @param limit Limit clause + */ + public DistinctAggregationFunction(List<ExpressionContext> expressions, + @Nullable List<OrderByExpressionContext> orderByExpressions, int limit) { + _expressions = expressions; + int numExpressions = expressions.size(); + _columns = new String[numExpressions]; + for (int i = 0; i < numExpressions; i++) { + _columns[i] = expressions.get(i).toString(); + } + _orderByExpressions = orderByExpressions; + _limit = limit; + } + + public String[] getColumns() { + return _columns; + } + + public List<OrderByExpressionContext> getOrderByExpressions() { + return _orderByExpressions; + } + + public int getLimit() { + return _limit; + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.DISTINCT; + } + + @Override + public String getColumnName() { + return AggregationFunctionType.DISTINCT.getName() + "_" + AggregationFunctionUtils.concatArgs(_columns); + } + + @Override + public String getResultColumnName() { + return AggregationFunctionType.DISTINCT.getName().toLowerCase() + "(" + AggregationFunctionUtils + .concatArgs(_columns) + ")"; + } + + @Override + public List<ExpressionContext> getInputExpressions() { + return _expressions; + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return new ObjectAggregationResultHolder(); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + int numBlockValSets = blockValSetMap.size(); + int numExpressions = _expressions.size(); + Preconditions + .checkState(numBlockValSets == numExpressions, "Size mismatch: numBlockValSets = %s, numExpressions = %s", + numBlockValSets, numExpressions); + + BlockValSet[] blockValSets = new BlockValSet[numExpressions]; + for (int i = 0; i < numExpressions; i++) { + blockValSets[i] = blockValSetMap.get(_expressions.get(i)); + } + + DistinctTable distinctTable = aggregationResultHolder.getResult(); + if (distinctTable == null) { + ColumnDataType[] columnDataTypes = new ColumnDataType[numExpressions]; + for (int i = 0; i < numExpressions; i++) { + columnDataTypes[i] = ColumnDataType.fromDataTypeSV(blockValSetMap.get(_expressions.get(i)).getValueType()); + } + DataSchema dataSchema = new DataSchema(_columns, columnDataTypes); + distinctTable = new DistinctTable(dataSchema, _orderByExpressions, _limit); + aggregationResultHolder.setValue(distinctTable); + } + + // TODO: Follow up PR will make few changes to start using DictionaryBasedAggregationOperator for DISTINCT queries + // without filter. + + if (distinctTable.hasOrderBy()) { + // With order-by, no need to check whether the DistinctTable is already satisfied + RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets); + for (int i = 0; i < length; i++) { + distinctTable.addWithOrderBy(new Record(blockValueFetcher.getRow(i))); + } + } else { + // Without order-by, early-terminate when the DistinctTable is already satisfied + if (distinctTable.isSatisfied()) { + return; + } + RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets); + for (int i = 0; i < length; i++) { + if (distinctTable.addWithoutOrderBy(new Record(blockValueFetcher.getRow(i)))) { + return; + } + } + } + } + + @Override + public DistinctTable extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + DistinctTable distinctTable = aggregationResultHolder.getResult(); + if (distinctTable != null) { + return distinctTable; + } else { + ColumnDataType[] columnDataTypes = new ColumnDataType[_columns.length]; + // NOTE: Use STRING for unknown type + Arrays.fill(columnDataTypes, ColumnDataType.STRING); + return new DistinctTable(new DataSchema(_columns, columnDataTypes), _orderByExpressions, _limit); + } + } + + /** + * NOTE: This method only handles merging of 2 main DistinctTables. It should not be used on Broker-side because it + * does not support merging deserialized DistinctTables. + * <p>{@inheritDoc} + */ + @Override + public DistinctTable merge(DistinctTable intermediateResult1, DistinctTable intermediateResult2) { + if (intermediateResult1.size() == 0) { + return intermediateResult2; + } + if (intermediateResult2.size() != 0) { + intermediateResult1.mergeMainDistinctTable(intermediateResult2); + } + return intermediateResult1; + } + + @Override + public boolean isIntermediateResultComparable() { + return false; + } + + @Override + public ColumnDataType getIntermediateResultColumnType() { + return ColumnDataType.OBJECT; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function"); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function"); + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function"); + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function"); + } + + @Override + public DistinctTable extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function"); + } + + @Override + public Comparable extractFinalResult(DistinctTable intermediateResult) { + throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function"); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org