This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch exact-distinct-count
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit a7ab0fd56978ca4412f86d6c09d86e1d3cb35baf
Author: kishoreg <g.kish...@gmail.com>
AuthorDate: Sun Aug 16 02:28:55 2020 -0700

    Support for exact distinct count for non int data types
---
 .../common/function/AggregationFunctionType.java   |   3 +-
 .../apache/pinot/core/common/ObjectSerDeUtils.java | 176 +++++++++++++--
 .../query/DictionaryBasedAggregationOperator.java  |  24 +-
 .../function/DistinctCountAggregationFunction.java | 243 ++++++++++++++++-----
 .../DistinctCountMVAggregationFunction.java        |  31 +--
 .../DistinctRawBloomFilterAggregationFunction.java | 226 +++++++++++++++++++
 6 files changed, 609 insertions(+), 94 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
index fc60ea6..b0db043 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
@@ -61,7 +61,8 @@ public enum AggregationFunctionType {
   PERCENTILEMV("percentileMV"),
   PERCENTILEESTMV("percentileEstMV"),
   PERCENTILETDIGESTMV("percentileTDigestMV"),
-  DISTINCT("distinct");
+  DISTINCT("distinct"),
+  DISTINCTRAWBLOOMFILTER("distinctRawBloomFilter");
 
   private final String _name;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 9c87921..8995952 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -19,17 +19,31 @@
 package org.apache.pinot.core.common;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.google.common.base.Charsets;
 import com.google.common.primitives.Longs;
 import com.tdunning.math.stats.MergingDigest;
 import com.tdunning.math.stats.TDigest;
 import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.doubles.DoubleIterator;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.doubles.DoubleSet;
+import it.unimi.dsi.fastutil.floats.FloatIterator;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatSet;
 import it.unimi.dsi.fastutil.ints.IntIterator;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -68,7 +82,11 @@ public class ObjectSerDeUtils {
     DistinctTable(11),
     DataSketch(12),
     Geometry(13),
-    RoaringBitmap(14);
+    RoaringBitmap(14),
+    LongSet(15),
+    FloatSet(16),
+    DoubleSet(17),
+    BytesSet(18);
 
     private final int _value;
 
@@ -111,6 +129,14 @@ public class ObjectSerDeUtils {
         return ObjectType.Geometry;
       } else if (value instanceof RoaringBitmap) {
         return ObjectType.RoaringBitmap;
+      } else if (value instanceof LongSet) {
+        return ObjectType.LongSet;
+      } else if (value instanceof it.unimi.dsi.fastutil.floats.FloatSet) {
+        return ObjectType.FloatSet;
+      } else if (value instanceof it.unimi.dsi.fastutil.doubles.DoubleSet) {
+        return ObjectType.DoubleSet;
+      } else if (value instanceof ObjectSet) {
+        return ObjectType.BytesSet;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + 
value.getClass().getSimpleName());
       }
@@ -452,6 +478,135 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<LongSet> LONG_SET_SER_DE = new 
ObjectSerDe<LongSet>() {
+
+    @Override
+    public byte[] serialize(LongSet longSet) {
+      int size = longSet.size();
+      byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES];
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+      byteBuffer.putInt(size);
+      LongIterator iterator = longSet.iterator();
+      while (iterator.hasNext()) {
+        byteBuffer.putLong(iterator.nextLong());
+      }
+      return bytes;
+    }
+
+    @Override
+    public LongSet deserialize(byte[] bytes) {
+      return deserialize(ByteBuffer.wrap(bytes));
+    }
+
+    @Override
+    public LongSet deserialize(ByteBuffer byteBuffer) {
+      int size = byteBuffer.getInt();
+      LongSet longSet = new LongOpenHashSet(size);
+      for (int i = 0; i < size; i++) {
+        longSet.add(byteBuffer.getLong());
+      }
+      return longSet;
+    }
+  };
+
+  public static final ObjectSerDe<FloatSet> FLOAT_SET_SER_DE = new 
ObjectSerDe<FloatSet>() {
+
+    @Override
+    public byte[] serialize(FloatSet floatSet) {
+      int size = floatSet.size();
+      byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES];
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+      byteBuffer.putInt(size);
+      FloatIterator iterator = floatSet.iterator();
+      while (iterator.hasNext()) {
+        byteBuffer.putFloat(iterator.nextFloat());
+      }
+      return bytes;
+    }
+
+    @Override
+    public FloatSet deserialize(byte[] bytes) {
+      return deserialize(ByteBuffer.wrap(bytes));
+    }
+
+    @Override
+    public FloatSet deserialize(ByteBuffer byteBuffer) {
+      int size = byteBuffer.getInt();
+      FloatSet floatSet = new FloatOpenHashSet(size);
+      for (int i = 0; i < size; i++) {
+        floatSet.add(byteBuffer.getLong());
+      }
+      return floatSet;
+    }
+  };
+
+  public static final ObjectSerDe<DoubleSet> DOUBLE_SET_SER_DE = new 
ObjectSerDe<DoubleSet>() {
+
+    @Override
+    public byte[] serialize(DoubleSet doubleSet) {
+      int size = doubleSet.size();
+      byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES];
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+      byteBuffer.putInt(size);
+      DoubleIterator iterator = doubleSet.iterator();
+      while (iterator.hasNext()) {
+        byteBuffer.putDouble(iterator.nextDouble());
+      }
+      return bytes;
+    }
+
+    @Override
+    public DoubleSet deserialize(byte[] bytes) {
+      return deserialize(ByteBuffer.wrap(bytes));
+    }
+
+    @Override
+    public DoubleSet deserialize(ByteBuffer byteBuffer) {
+      int size = byteBuffer.getInt();
+      DoubleSet doubleSet = new DoubleOpenHashSet(size);
+      for (int i = 0; i < size; i++) {
+        doubleSet.add(byteBuffer.getDouble());
+      }
+      return doubleSet;
+    }
+  };
+
+  public static final ObjectSerDe<ObjectSet<byte[]>> BYTES_SET_SER_DE = new 
ObjectSerDe<ObjectSet<byte[]>>() {
+
+    @Override
+    public byte[] serialize(ObjectSet bytesSet) {
+      int size = bytesSet.size();
+      byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES];
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+      byteBuffer.putInt(size);
+      ObjectIterator<byte[]> iterator = bytesSet.iterator();
+      while (iterator.hasNext()) {
+        byte[] val = iterator.next();
+        byteBuffer.putInt(val.length);
+        byteBuffer.put(val);
+      }
+      return bytes;
+    }
+
+    @Override
+    public ObjectSet<byte[]> deserialize(byte[] bytes) {
+      return deserialize(ByteBuffer.wrap(bytes));
+    }
+
+    @Override
+    public ObjectSet<byte[]> deserialize(ByteBuffer byteBuffer) {
+      int size = byteBuffer.getInt();
+      ObjectOpenHashSet<byte[]> bytesSet = new ObjectOpenHashSet<>(size);
+      for (int i = 0; i < size; i++) {
+        int length = byteBuffer.getInt();
+        byte[] val = new byte[length];
+        byteBuffer.get(val);
+        bytesSet.add(val);
+      }
+      return bytesSet;
+    }
+  };
+
   public static final ObjectSerDe<TDigest> TDIGEST_SER_DE = new 
ObjectSerDe<TDigest>() {
 
     @Override
@@ -538,23 +693,8 @@ public class ObjectSerDeUtils {
 
   // NOTE: DO NOT change the order, it has to be the same order as the 
ObjectType
   //@formatter:off
-  private static final ObjectSerDe[] SER_DES = {
-      STRING_SER_DE,
-      LONG_SER_DE,
-      DOUBLE_SER_DE,
-      DOUBLE_ARRAY_LIST_SER_DE,
-      AVG_PAIR_SER_DE,
-      MIN_MAX_RANGE_PAIR_SER_DE,
-      HYPER_LOG_LOG_SER_DE,
-      QUANTILE_DIGEST_SER_DE,
-      MAP_SER_DE,
-      INT_SET_SER_DE,
-      TDIGEST_SER_DE,
-      DISTINCT_TABLE_SER_DE,
-      DATA_SKETCH_SER_DE,
-      GEOMETRY_SER_DE,
-      ROARING_BITMAP_SER_DE
-  };
+  private static final ObjectSerDe[] SER_DES =
+      {STRING_SER_DE, LONG_SER_DE, DOUBLE_SER_DE, DOUBLE_ARRAY_LIST_SER_DE, 
AVG_PAIR_SER_DE, MIN_MAX_RANGE_PAIR_SER_DE, HYPER_LOG_LOG_SER_DE, 
QUANTILE_DIGEST_SER_DE, MAP_SER_DE, INT_SET_SER_DE, TDIGEST_SER_DE, 
DISTINCT_TABLE_SER_DE, DATA_SKETCH_SER_DE, GEOMETRY_SER_DE, 
ROARING_BITMAP_SER_DE, LONG_SET_SER_DE, FLOAT_SET_SER_DE, DOUBLE_SET_SER_DE, 
BYTES_SET_SER_DE};
   //@formatter:on
 
   public static byte[] serialize(Object value) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
index 7fa6798..b83a202 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
@@ -18,7 +18,13 @@
  */
 package org.apache.pinot.core.operator.query;
 
+import com.google.common.base.Charsets;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.util.AbstractCollection;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -77,36 +83,42 @@ public class DictionaryBasedAggregationOperator extends 
BaseOperator<Intermediat
               .add(new MinMaxRangePair(dictionary.getDoubleValue(0), 
dictionary.getDoubleValue(dictionarySize - 1)));
           break;
         case DISTINCTCOUNT:
-          IntOpenHashSet set = new IntOpenHashSet(dictionarySize);
+          AbstractCollection set;
           switch (dictionary.getValueType()) {
             case INT:
+              set = new IntOpenHashSet(dictionarySize);
               for (int dictId = 0; dictId < dictionarySize; dictId++) {
                 set.add(dictionary.getIntValue(dictId));
               }
               break;
             case LONG:
+              set = new LongOpenHashSet(dictionarySize);
               for (int dictId = 0; dictId < dictionarySize; dictId++) {
-                set.add(Long.hashCode(dictionary.getLongValue(dictId)));
+                set.add(dictionary.getLongValue(dictId));
               }
               break;
             case FLOAT:
+              set = new FloatOpenHashSet(dictionarySize);
               for (int dictId = 0; dictId < dictionarySize; dictId++) {
-                set.add(Float.hashCode(dictionary.getFloatValue(dictId)));
+                set.add(dictionary.getFloatValue(dictId));
               }
               break;
             case DOUBLE:
+              set = new DoubleOpenHashSet(dictionarySize);
               for (int dictId = 0; dictId < dictionarySize; dictId++) {
-                set.add(Double.hashCode(dictionary.getDoubleValue(dictId)));
+                set.add(dictionary.getDoubleValue(dictId));
               }
               break;
             case STRING:
+              set = new ObjectOpenHashSet<byte[]>(dictionarySize);
               for (int dictId = 0; dictId < dictionarySize; dictId++) {
-                set.add(dictionary.getStringValue(dictId).hashCode());
+                
set.add(dictionary.getStringValue(dictId).getBytes(Charsets.UTF_8));
               }
               break;
             case BYTES:
+              set = new ObjectOpenHashSet<byte[]>(dictionarySize);
               for (int dictId = 0; dictId < dictionarySize; dictId++) {
-                set.add(Arrays.hashCode(dictionary.getBytesValue(dictId)));
+                set.add(dictionary.getBytesValue(dictId));
               }
               break;
             default:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
index e8e7e97..59a812f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
@@ -18,8 +18,14 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import com.google.common.base.Charsets;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-import java.util.Arrays;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.util.AbstractCollection;
+import java.util.Iterator;
 import java.util.Map;
 import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -35,7 +41,7 @@ import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
 
-public class DistinctCountAggregationFunction extends 
BaseSingleInputAggregationFunction<IntOpenHashSet, Integer> {
+public class DistinctCountAggregationFunction extends 
BaseSingleInputAggregationFunction<AbstractCollection, Integer> {
 
   public DistinctCountAggregationFunction(ExpressionContext expression) {
     super(expression);
@@ -69,9 +75,10 @@ public class DistinctCountAggregationFunction extends 
BaseSingleInputAggregation
       return;
     }
 
-    // For non-dictionary-encoded expression, store hash code of the values 
into the value set
-    IntOpenHashSet valueSet = getValueSet(aggregationResultHolder);
+    // For non-dictionary-encoded expression
     DataType valueType = blockValSet.getValueType();
+
+    AbstractCollection valueSet = getValueSet(aggregationResultHolder, 
valueType);
     switch (valueType) {
       case INT:
         int[] intValues = blockValSet.getIntValuesSV();
@@ -82,31 +89,31 @@ public class DistinctCountAggregationFunction extends 
BaseSingleInputAggregation
       case LONG:
         long[] longValues = blockValSet.getLongValuesSV();
         for (int i = 0; i < length; i++) {
-          valueSet.add(Long.hashCode(longValues[i]));
+          valueSet.add(longValues[i]);
         }
         break;
       case FLOAT:
         float[] floatValues = blockValSet.getFloatValuesSV();
         for (int i = 0; i < length; i++) {
-          valueSet.add(Float.hashCode(floatValues[i]));
+          valueSet.add(floatValues[i]);
         }
         break;
       case DOUBLE:
         double[] doubleValues = blockValSet.getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          valueSet.add(Double.hashCode(doubleValues[i]));
+          valueSet.add(doubleValues[i]);
         }
         break;
       case STRING:
         String[] stringValues = blockValSet.getStringValuesSV();
         for (int i = 0; i < length; i++) {
-          valueSet.add(stringValues[i].hashCode());
+          valueSet.add(stringValues[i].getBytes(Charsets.UTF_8));
         }
         break;
       case BYTES:
         byte[][] bytesValues = blockValSet.getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          valueSet.add(Arrays.hashCode(bytesValues[i]));
+          valueSet.add(bytesValues[i]);
         }
         break;
       default:
@@ -135,37 +142,37 @@ public class DistinctCountAggregationFunction extends 
BaseSingleInputAggregation
       case INT:
         int[] intValues = blockValSet.getIntValuesSV();
         for (int i = 0; i < length; i++) {
-          getValueSet(groupByResultHolder, groupKeyArray[i]).add(intValues[i]);
+          getValueSet(groupByResultHolder, groupKeyArray[i], 
valueType).add(intValues[i]);
         }
         break;
       case LONG:
         long[] longValues = blockValSet.getLongValuesSV();
         for (int i = 0; i < length; i++) {
-          getValueSet(groupByResultHolder, 
groupKeyArray[i]).add(Long.hashCode(longValues[i]));
+          getValueSet(groupByResultHolder, groupKeyArray[i], 
valueType).add(longValues[i]);
         }
         break;
       case FLOAT:
         float[] floatValues = blockValSet.getFloatValuesSV();
         for (int i = 0; i < length; i++) {
-          getValueSet(groupByResultHolder, 
groupKeyArray[i]).add(Float.hashCode(floatValues[i]));
+          getValueSet(groupByResultHolder, groupKeyArray[i], 
valueType).add(floatValues[i]);
         }
         break;
       case DOUBLE:
         double[] doubleValues = blockValSet.getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          getValueSet(groupByResultHolder, 
groupKeyArray[i]).add(Double.hashCode(doubleValues[i]));
+          getValueSet(groupByResultHolder, groupKeyArray[i], 
valueType).add(doubleValues[i]);
         }
         break;
       case STRING:
         String[] stringValues = blockValSet.getStringValuesSV();
         for (int i = 0; i < length; i++) {
-          getValueSet(groupByResultHolder, 
groupKeyArray[i]).add(stringValues[i].hashCode());
+          getValueSet(groupByResultHolder, groupKeyArray[i], 
valueType).add(stringValues[i].getBytes(Charsets.UTF_8));
         }
         break;
       case BYTES:
         byte[][] bytesValues = blockValSet.getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          getValueSet(groupByResultHolder, 
groupKeyArray[i]).add(Arrays.hashCode(bytesValues[i]));
+          getValueSet(groupByResultHolder, groupKeyArray[i], 
valueType).add(bytesValues[i]);
         }
         break;
       default:
@@ -194,37 +201,37 @@ public class DistinctCountAggregationFunction extends 
BaseSingleInputAggregation
       case INT:
         int[] intValues = blockValSet.getIntValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
intValues[i]);
+          setValueForGroupKeys(groupByResultHolder, valueType, 
groupKeysArray[i], intValues[i]);
         }
         break;
       case LONG:
         long[] longValues = blockValSet.getLongValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
Long.hashCode(longValues[i]));
+          setValueForGroupKeys(groupByResultHolder, valueType, 
groupKeysArray[i], (longValues[i]));
         }
         break;
       case FLOAT:
         float[] floatValues = blockValSet.getFloatValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
Float.hashCode(floatValues[i]));
+          setValueForGroupKeys(groupByResultHolder, valueType, 
groupKeysArray[i], floatValues[i]);
         }
         break;
       case DOUBLE:
         double[] doubleValues = blockValSet.getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
Double.hashCode(doubleValues[i]));
+          setValueForGroupKeys(groupByResultHolder, valueType, 
groupKeysArray[i], doubleValues[i]);
         }
         break;
       case STRING:
         String[] stringValues = blockValSet.getStringValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
stringValues[i].hashCode());
+          setValueForGroupKeys(groupByResultHolder, valueType, 
groupKeysArray[i], stringValues[i].getBytes(Charsets.UTF_8));
         }
         break;
       case BYTES:
         byte[][] bytesValues = blockValSet.getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
Arrays.hashCode(bytesValues[i]));
+          setValueForGroupKeys(groupByResultHolder, valueType, 
groupKeysArray[i], bytesValues[i]);
         }
         break;
       default:
@@ -233,10 +240,10 @@ public class DistinctCountAggregationFunction extends 
BaseSingleInputAggregation
   }
 
   @Override
-  public IntOpenHashSet extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+  public AbstractCollection extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
     Object result = aggregationResultHolder.getResult();
     if (result == null) {
-      return new IntOpenHashSet();
+      return emptyCollection();
     }
 
     if (result instanceof DictIdsWrapper) {
@@ -244,15 +251,39 @@ public class DistinctCountAggregationFunction extends 
BaseSingleInputAggregation
       return convertToValueSet((DictIdsWrapper) result);
     } else {
       // For non-dictionary-encoded expression, directly return the value set
-      return (IntOpenHashSet) result;
+      return (AbstractCollection) result;
     }
   }
 
+  private AbstractCollection emptyCollection() {
+    return new AbstractCollection() {
+      @Override
+      public Iterator iterator() {
+        return new Iterator() {
+          @Override
+          public boolean hasNext() {
+            return false;
+          }
+
+          @Override
+          public Object next() {
+            return null;
+          }
+        };
+      }
+
+      @Override
+      public int size() {
+        return 0;
+      }
+    };
+  }
+
   @Override
-  public IntOpenHashSet extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+  public AbstractCollection extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
     Object result = groupByResultHolder.getResult(groupKey);
     if (result == null) {
-      return new IntOpenHashSet();
+      return emptyCollection();
     }
 
     if (result instanceof DictIdsWrapper) {
@@ -260,14 +291,52 @@ public class DistinctCountAggregationFunction extends 
BaseSingleInputAggregation
       return convertToValueSet((DictIdsWrapper) result);
     } else {
       // For non-dictionary-encoded expression, directly return the value set
-      return (IntOpenHashSet) result;
+      return (AbstractCollection) result;
     }
   }
 
   @Override
-  public IntOpenHashSet merge(IntOpenHashSet intermediateResult1, 
IntOpenHashSet intermediateResult2) {
-    intermediateResult1.addAll(intermediateResult2);
-    return intermediateResult1;
+  public AbstractCollection merge(AbstractCollection intermediateResult1, 
AbstractCollection intermediateResult2) {
+    if 
(intermediateResult1.getClass().isAssignableFrom(intermediateResult2.getClass()))
 {
+      intermediateResult1.addAll(intermediateResult2);
+      return intermediateResult1;
+    } else {
+      //handle backwards compatibility, we used to use IntHashSet for all 
datatypes earlier
+      //so we try to convert other types into int using hashcode
+      //Note this code path is executed only while brokers and servers are 
getting upgraded.
+      //When both are on the same version, they will satisfy the 
intermediateResult1.getClass().isAssignableFrom(intermediateResult2.getClass() 
condition
+      IntOpenHashSet intOpenHashSet;
+      AbstractCollection toMerge;
+      if (intermediateResult1 instanceof IntOpenHashSet) {
+        intOpenHashSet = (IntOpenHashSet) intermediateResult1;
+        toMerge = intermediateResult2;
+      } else {
+        intOpenHashSet = (IntOpenHashSet) intermediateResult2;
+        toMerge = intermediateResult1;
+      }
+      if (toMerge instanceof LongOpenHashSet) {
+        LongOpenHashSet longOpenHashSet = (LongOpenHashSet) toMerge;
+        for (long e : longOpenHashSet) {
+          intOpenHashSet.add(Long.hashCode(e));
+        }
+      } else if (toMerge instanceof FloatOpenHashSet) {
+        FloatOpenHashSet floatOpenHashSet = (FloatOpenHashSet) toMerge;
+        for (float e : floatOpenHashSet) {
+          intOpenHashSet.add(Float.hashCode(e));
+        }
+      } else if (toMerge instanceof DoubleOpenHashSet) {
+        DoubleOpenHashSet doubleOpenHashSet = (DoubleOpenHashSet) toMerge;
+        for (double e : doubleOpenHashSet) {
+          intOpenHashSet.add(Double.hashCode(e));
+        }
+      } else if (toMerge instanceof ObjectOpenHashSet) {
+        ObjectOpenHashSet objectOpenHashSet = (ObjectOpenHashSet) toMerge;
+        for (Object e : objectOpenHashSet) {
+          intOpenHashSet.add(e.hashCode());
+        }
+      }
+      return intOpenHashSet;
+    }
   }
 
   @Override
@@ -286,7 +355,7 @@ public class DistinctCountAggregationFunction extends 
BaseSingleInputAggregation
   }
 
   @Override
-  public Integer extractFinalResult(IntOpenHashSet intermediateResult) {
+  public Integer extractFinalResult(AbstractCollection intermediateResult) {
     return intermediateResult.size();
   }
 
@@ -306,15 +375,42 @@ public class DistinctCountAggregationFunction extends 
BaseSingleInputAggregation
   /**
    * Returns the value set from the result holder or creates a new one if it 
does not exist.
    */
-  protected static IntOpenHashSet getValueSet(AggregationResultHolder 
aggregationResultHolder) {
-    IntOpenHashSet valueSet = aggregationResultHolder.getResult();
+  protected static AbstractCollection getValueSet(AggregationResultHolder 
aggregationResultHolder, DataType valueType) {
+    AbstractCollection valueSet = aggregationResultHolder.getResult();
     if (valueSet == null) {
-      valueSet = new IntOpenHashSet();
+      valueSet = getAbstractCollection(valueType);
       aggregationResultHolder.setValue(valueSet);
     }
     return valueSet;
   }
 
+  private static AbstractCollection getAbstractCollection(DataType valueType) {
+    AbstractCollection valueSet;
+    switch (valueType) {
+      case INT:
+        valueSet = new IntOpenHashSet();
+        break;
+      case LONG:
+        valueSet = new LongOpenHashSet();
+        break;
+      case FLOAT:
+        valueSet = new FloatOpenHashSet();
+        break;
+      case DOUBLE:
+        valueSet = new DoubleOpenHashSet();
+        break;
+      case STRING:
+        valueSet = new ObjectOpenHashSet<byte[]>();
+        break;
+      case BYTES:
+        valueSet = new ObjectOpenHashSet<byte[]>();
+        break;
+      default:
+        throw new IllegalStateException("Illegal data type for DISTINCT_COUNT 
aggregation function: " + valueType);
+    }
+    return valueSet;
+  }
+
   /**
    * Returns the dictionary id bitmap for the given group key or creates a new 
one if it does not exist.
    */
@@ -331,10 +427,11 @@ public class DistinctCountAggregationFunction extends 
BaseSingleInputAggregation
   /**
    * Returns the value set for the given group key or creates a new one if it 
does not exist.
    */
-  protected static IntOpenHashSet getValueSet(GroupByResultHolder 
groupByResultHolder, int groupKey) {
-    IntOpenHashSet valueSet = groupByResultHolder.getResult(groupKey);
+  protected static AbstractCollection getValueSet(GroupByResultHolder 
groupByResultHolder, int groupKey,
+      DataType valueType) {
+    AbstractCollection valueSet = groupByResultHolder.getResult(groupKey);
     if (valueSet == null) {
-      valueSet = new IntOpenHashSet();
+      valueSet = getAbstractCollection(valueType);
       groupByResultHolder.setValueForKey(groupKey, valueSet);
     }
     return valueSet;
@@ -353,9 +450,38 @@ public class DistinctCountAggregationFunction extends 
BaseSingleInputAggregation
   /**
    * Helper method to set value for the given group keys into the result 
holder.
    */
-  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, int value) {
+  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, DataType valueType, int[] groupKeys,
+      int value) {
+    for (int groupKey : groupKeys) {
+      getValueSet(groupByResultHolder, groupKey, valueType).add(value);
+    }
+  }
+
+  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, DataType valueType, int[] groupKeys,
+      long value) {
+    for (int groupKey : groupKeys) {
+      getValueSet(groupByResultHolder, groupKey, valueType).add(value);
+    }
+  }
+
+  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, DataType valueType, int[] groupKeys,
+      float value) {
+    for (int groupKey : groupKeys) {
+      getValueSet(groupByResultHolder, groupKey, valueType).add(value);
+    }
+  }
+
+  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, DataType valueType, int[] groupKeys,
+      double value) {
+    for (int groupKey : groupKeys) {
+      getValueSet(groupByResultHolder, groupKey, valueType).add(value);
+    }
+  }
+
+  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, DataType valueType, int[] groupKeys,
+      byte[] value) {
     for (int groupKey : groupKeys) {
-      getValueSet(groupByResultHolder, groupKey).add(value);
+      getValueSet(groupByResultHolder, groupKey, valueType).add(value);
     }
   }
 
@@ -363,47 +489,56 @@ public class DistinctCountAggregationFunction extends 
BaseSingleInputAggregation
    * Helper method to read dictionary and convert dictionary ids to hash code 
of the values for dictionary-encoded
    * expression.
    */
-  private static IntOpenHashSet convertToValueSet(DictIdsWrapper 
dictIdsWrapper) {
+  private static AbstractCollection convertToValueSet(DictIdsWrapper 
dictIdsWrapper) {
     Dictionary dictionary = dictIdsWrapper._dictionary;
     RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
-    IntOpenHashSet valueSet = new 
IntOpenHashSet(dictIdBitmap.getCardinality());
     PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
     DataType valueType = dictionary.getValueType();
     switch (valueType) {
       case INT:
+        IntOpenHashSet intOpenHashSet = new 
IntOpenHashSet(dictIdBitmap.getCardinality());
         while (iterator.hasNext()) {
-          valueSet.add(dictionary.getIntValue(iterator.next()));
+          intOpenHashSet.add(dictionary.getIntValue(iterator.next()));
         }
-        break;
+        return intOpenHashSet;
       case LONG:
+        LongOpenHashSet longOpenHashSet = new 
LongOpenHashSet(dictIdBitmap.getCardinality());
         while (iterator.hasNext()) {
-          
valueSet.add(Long.hashCode(dictionary.getLongValue(iterator.next())));
+          longOpenHashSet.add(dictionary.getLongValue(iterator.next()));
         }
-        break;
+        return longOpenHashSet;
       case FLOAT:
+        FloatOpenHashSet floatOpenHashSet = new 
FloatOpenHashSet(dictIdBitmap.getCardinality());
         while (iterator.hasNext()) {
-          
valueSet.add(Float.hashCode(dictionary.getFloatValue(iterator.next())));
+          floatOpenHashSet.add(dictionary.getFloatValue(iterator.next()));
         }
-        break;
+        return floatOpenHashSet;
+
       case DOUBLE:
+        DoubleOpenHashSet doubleOpenHashSet = new 
DoubleOpenHashSet(dictIdBitmap.getCardinality());
         while (iterator.hasNext()) {
-          
valueSet.add(Double.hashCode(dictionary.getDoubleValue(iterator.next())));
+          doubleOpenHashSet.add(dictionary.getDoubleValue(iterator.next()));
         }
-        break;
+        return doubleOpenHashSet;
       case STRING:
+        ObjectOpenHashSet<byte[]> stringObjectOpenHashSet =
+            new ObjectOpenHashSet<byte[]>(dictIdBitmap.getCardinality());
         while (iterator.hasNext()) {
-          valueSet.add(dictionary.getStringValue(iterator.next()).hashCode());
+          
stringObjectOpenHashSet.add(dictionary.getStringValue(iterator.next()).getBytes(Charsets.UTF_8));
         }
-        break;
+        return stringObjectOpenHashSet;
+
       case BYTES:
+        ObjectOpenHashSet<byte[]> bytesObjectOpenHashSet =
+            new ObjectOpenHashSet<byte[]>(dictIdBitmap.getCardinality());
+
         while (iterator.hasNext()) {
-          
valueSet.add(Arrays.hashCode(dictionary.getBytesValue(iterator.next())));
+          
bytesObjectOpenHashSet.add((dictionary.getBytesValue(iterator.next())));
         }
-        break;
+        return bytesObjectOpenHashSet;
       default:
         throw new IllegalStateException("Illegal data type for DISTINCT_COUNT 
aggregation function: " + valueType);
     }
-    return valueSet;
   }
 
   private static final class DictIdsWrapper {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
index fb6b2e3..4c93181 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import java.util.AbstractCollection;
 import java.util.Map;
 import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.common.BlockValSet;
@@ -58,8 +59,8 @@ public class DistinctCountMVAggregationFunction extends 
DistinctCountAggregation
     }
 
     // For non-dictionary-encoded expression, store hash code of the values 
into the value set
-    IntOpenHashSet valueSet = getValueSet(aggregationResultHolder);
     FieldSpec.DataType valueType = blockValSet.getValueType();
+    AbstractCollection valueSet = getValueSet(aggregationResultHolder, 
valueType);
     switch (valueType) {
       case INT:
         int[][] intValues = blockValSet.getIntValuesMV();
@@ -126,7 +127,7 @@ public class DistinctCountMVAggregationFunction extends 
DistinctCountAggregation
       case INT:
         int[][] intValues = blockValSet.getIntValuesMV();
         for (int i = 0; i < length; i++) {
-          IntOpenHashSet valueSet = getValueSet(groupByResultHolder, 
groupKeyArray[i]);
+          AbstractCollection valueSet = getValueSet(groupByResultHolder, 
groupKeyArray[i], valueType);
           for (int value : intValues[i]) {
             valueSet.add(value);
           }
@@ -135,36 +136,36 @@ public class DistinctCountMVAggregationFunction extends 
DistinctCountAggregation
       case LONG:
         long[][] longValues = blockValSet.getLongValuesMV();
         for (int i = 0; i < length; i++) {
-          IntOpenHashSet valueSet = getValueSet(groupByResultHolder, 
groupKeyArray[i]);
+          AbstractCollection valueSet = getValueSet(groupByResultHolder, 
groupKeyArray[i], valueType);
           for (long value : longValues[i]) {
-            valueSet.add(Long.hashCode(value));
+            valueSet.add(value);
           }
         }
         break;
       case FLOAT:
         float[][] floatValues = blockValSet.getFloatValuesMV();
         for (int i = 0; i < length; i++) {
-          IntOpenHashSet valueSet = getValueSet(groupByResultHolder, 
groupKeyArray[i]);
+          AbstractCollection valueSet = getValueSet(groupByResultHolder, 
groupKeyArray[i], valueType);
           for (float value : floatValues[i]) {
-            valueSet.add(Float.hashCode(value));
+            valueSet.add(value);
           }
         }
         break;
       case DOUBLE:
         double[][] doubleValues = blockValSet.getDoubleValuesMV();
         for (int i = 0; i < length; i++) {
-          IntOpenHashSet valueSet = getValueSet(groupByResultHolder, 
groupKeyArray[i]);
+          AbstractCollection valueSet = getValueSet(groupByResultHolder, 
groupKeyArray[i], valueType);
           for (double value : doubleValues[i]) {
-            valueSet.add(Double.hashCode(value));
+            valueSet.add(value);
           }
         }
         break;
       case STRING:
         String[][] stringValues = blockValSet.getStringValuesMV();
         for (int i = 0; i < length; i++) {
-          IntOpenHashSet valueSet = getValueSet(groupByResultHolder, 
groupKeyArray[i]);
+          AbstractCollection valueSet = getValueSet(groupByResultHolder, 
groupKeyArray[i], valueType);
           for (String value : stringValues[i]) {
-            valueSet.add(value.hashCode());
+            valueSet.add(value);
           }
         }
         break;
@@ -197,7 +198,7 @@ public class DistinctCountMVAggregationFunction extends 
DistinctCountAggregation
         int[][] intValues = blockValSet.getIntValuesMV();
         for (int i = 0; i < length; i++) {
           for (int groupKey : groupKeysArray[i]) {
-            IntOpenHashSet valueSet = getValueSet(groupByResultHolder, 
groupKey);
+            AbstractCollection valueSet = getValueSet(groupByResultHolder, 
groupKey, valueType);
             for (int value : intValues[i]) {
               valueSet.add(value);
             }
@@ -208,7 +209,7 @@ public class DistinctCountMVAggregationFunction extends 
DistinctCountAggregation
         long[][] longValues = blockValSet.getLongValuesMV();
         for (int i = 0; i < length; i++) {
           for (int groupKey : groupKeysArray[i]) {
-            IntOpenHashSet valueSet = getValueSet(groupByResultHolder, 
groupKey);
+            AbstractCollection valueSet = getValueSet(groupByResultHolder, 
groupKey, valueType);
             for (long value : longValues[i]) {
               valueSet.add(Long.hashCode(value));
             }
@@ -219,7 +220,7 @@ public class DistinctCountMVAggregationFunction extends 
DistinctCountAggregation
         float[][] floatValues = blockValSet.getFloatValuesMV();
         for (int i = 0; i < length; i++) {
           for (int groupKey : groupKeysArray[i]) {
-            IntOpenHashSet valueSet = getValueSet(groupByResultHolder, 
groupKey);
+            AbstractCollection valueSet = getValueSet(groupByResultHolder, 
groupKey, valueType);
             for (float value : floatValues[i]) {
               valueSet.add(Float.hashCode(value));
             }
@@ -230,7 +231,7 @@ public class DistinctCountMVAggregationFunction extends 
DistinctCountAggregation
         double[][] doubleValues = blockValSet.getDoubleValuesMV();
         for (int i = 0; i < length; i++) {
           for (int groupKey : groupKeysArray[i]) {
-            IntOpenHashSet valueSet = getValueSet(groupByResultHolder, 
groupKey);
+            AbstractCollection valueSet = getValueSet(groupByResultHolder, 
groupKey, valueType);
             for (double value : doubleValues[i]) {
               valueSet.add(Double.hashCode(value));
             }
@@ -241,7 +242,7 @@ public class DistinctCountMVAggregationFunction extends 
DistinctCountAggregation
         String[][] stringValues = blockValSet.getStringValuesMV();
         for (int i = 0; i < length; i++) {
           for (int groupKey : groupKeysArray[i]) {
-            IntOpenHashSet valueSet = getValueSet(groupByResultHolder, 
groupKey);
+            AbstractCollection valueSet = getValueSet(groupByResultHolder, 
groupKey, valueType);
             for (String value : stringValues[i]) {
               valueSet.add(value.hashCode());
             }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctRawBloomFilterAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctRawBloomFilterAggregationFunction.java
new file mode 100644
index 0000000..72e7edc
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctRawBloomFilterAggregationFunction.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import 
org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+
+
+/**
+ * The DISTINCT clause in SQL is executed as the DISTINCT aggregation function.
+ * TODO: Support group-by
+ */
+@SuppressWarnings("rawtypes")
+public class DistinctAggregationFunction implements 
AggregationFunction<DistinctTable, Comparable> {
+  private final List<ExpressionContext> _expressions;
+  private final String[] _columns;
+  private final List<OrderByExpressionContext> _orderByExpressions;
+  private final int _limit;
+
+  /**
+   * Constructor for the class.
+   *
+   * @param expressions Distinct columns to return
+   * @param orderByExpressions Order By clause
+   * @param limit Limit clause
+   */
+  public DistinctAggregationFunction(List<ExpressionContext> expressions,
+      @Nullable List<OrderByExpressionContext> orderByExpressions, int limit) {
+    _expressions = expressions;
+    int numExpressions = expressions.size();
+    _columns = new String[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      _columns[i] = expressions.get(i).toString();
+    }
+    _orderByExpressions = orderByExpressions;
+    _limit = limit;
+  }
+
+  public String[] getColumns() {
+    return _columns;
+  }
+
+  public List<OrderByExpressionContext> getOrderByExpressions() {
+    return _orderByExpressions;
+  }
+
+  public int getLimit() {
+    return _limit;
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCT;
+  }
+
+  @Override
+  public String getColumnName() {
+    return AggregationFunctionType.DISTINCT.getName() + "_" + 
AggregationFunctionUtils.concatArgs(_columns);
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return AggregationFunctionType.DISTINCT.getName().toLowerCase() + "(" + 
AggregationFunctionUtils
+        .concatArgs(_columns) + ")";
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return _expressions;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    int numBlockValSets = blockValSetMap.size();
+    int numExpressions = _expressions.size();
+    Preconditions
+        .checkState(numBlockValSets == numExpressions, "Size mismatch: 
numBlockValSets = %s, numExpressions = %s",
+            numBlockValSets, numExpressions);
+
+    BlockValSet[] blockValSets = new BlockValSet[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      blockValSets[i] = blockValSetMap.get(_expressions.get(i));
+    }
+
+    DistinctTable distinctTable = aggregationResultHolder.getResult();
+    if (distinctTable == null) {
+      ColumnDataType[] columnDataTypes = new ColumnDataType[numExpressions];
+      for (int i = 0; i < numExpressions; i++) {
+        columnDataTypes[i] = 
ColumnDataType.fromDataTypeSV(blockValSetMap.get(_expressions.get(i)).getValueType());
+      }
+      DataSchema dataSchema = new DataSchema(_columns, columnDataTypes);
+      distinctTable = new DistinctTable(dataSchema, _orderByExpressions, 
_limit);
+      aggregationResultHolder.setValue(distinctTable);
+    }
+
+    // TODO: Follow up PR will make few changes to start using 
DictionaryBasedAggregationOperator for DISTINCT queries
+    //       without filter.
+
+    if (distinctTable.hasOrderBy()) {
+      // With order-by, no need to check whether the DistinctTable is already 
satisfied
+      RowBasedBlockValueFetcher blockValueFetcher = new 
RowBasedBlockValueFetcher(blockValSets);
+      for (int i = 0; i < length; i++) {
+        distinctTable.addWithOrderBy(new Record(blockValueFetcher.getRow(i)));
+      }
+    } else {
+      // Without order-by, early-terminate when the DistinctTable is already 
satisfied
+      if (distinctTable.isSatisfied()) {
+        return;
+      }
+      RowBasedBlockValueFetcher blockValueFetcher = new 
RowBasedBlockValueFetcher(blockValSets);
+      for (int i = 0; i < length; i++) {
+        if (distinctTable.addWithoutOrderBy(new 
Record(blockValueFetcher.getRow(i)))) {
+          return;
+        }
+      }
+    }
+  }
+
+  @Override
+  public DistinctTable extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    DistinctTable distinctTable = aggregationResultHolder.getResult();
+    if (distinctTable != null) {
+      return distinctTable;
+    } else {
+      ColumnDataType[] columnDataTypes = new ColumnDataType[_columns.length];
+      // NOTE: Use STRING for unknown type
+      Arrays.fill(columnDataTypes, ColumnDataType.STRING);
+      return new DistinctTable(new DataSchema(_columns, columnDataTypes), 
_orderByExpressions, _limit);
+    }
+  }
+
+  /**
+   * NOTE: This method only handles merging of 2 main DistinctTables. It 
should not be used on Broker-side because it
+   *       does not support merging deserialized DistinctTables.
+   * <p>{@inheritDoc}
+   */
+  @Override
+  public DistinctTable merge(DistinctTable intermediateResult1, DistinctTable 
intermediateResult2) {
+    if (intermediateResult1.size() == 0) {
+      return intermediateResult2;
+    }
+    if (intermediateResult2.size() != 0) {
+      intermediateResult1.mergeMainDistinctTable(intermediateResult2);
+    }
+    return intermediateResult1;
+  }
+
+  @Override
+  public boolean isIntermediateResultComparable() {
+    return false;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    throw new UnsupportedOperationException("Operation not supported for 
DISTINCT aggregation function");
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    throw new UnsupportedOperationException("Operation not supported for 
DISTINCT aggregation function");
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    throw new UnsupportedOperationException("Operation not supported for 
DISTINCT aggregation function");
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    throw new UnsupportedOperationException("Operation not supported for 
DISTINCT aggregation function");
+  }
+
+  @Override
+  public DistinctTable extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    throw new UnsupportedOperationException("Operation not supported for 
DISTINCT aggregation function");
+  }
+
+  @Override
+  public Comparable extractFinalResult(DistinctTable intermediateResult) {
+    throw new UnsupportedOperationException("Operation not supported for 
DISTINCT aggregation function");
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to