This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8404bc872e Add base class for custom object accumulator (#12685)
8404bc872e is described below

commit 8404bc872eb6540a9df75260c5206a2cd41d2ed1
Author: David Cromberge <davecrombe...@gmail.com>
AuthorDate: Mon Mar 25 23:33:29 2024 +0000

    Add base class for custom object accumulator (#12685)
---
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  72 +++++-
 ...ValueIntegerTupleSketchAggregationFunction.java |  22 +-
 .../DistinctCountCPCSketchAggregationFunction.java | 248 +++++++++++++------
 ...CountIntegerTupleSketchAggregationFunction.java |  12 +-
 ...stinctCountRawCPCSketchAggregationFunction.java |   8 +-
 .../IntegerTupleSketchAggregationFunction.java     | 271 +++++++++++++++------
 ...aluesIntegerTupleSketchAggregationFunction.java |  16 +-
 .../pinot/core/common/ObjectSerDeUtilsTest.java    |  54 ++++
 .../local/customobject/CpcSketchAccumulator.java   |  79 ++++++
 .../customobject/CustomObjectAccumulator.java      | 121 +++++++++
 .../local/customobject/ThetaSketchAccumulator.java |  53 +---
 ...mulator.java => TupleIntSketchAccumulator.java} |  93 +++----
 .../customobject/CpcSketchAccumulatorTest.java     |  92 +++++++
 .../TupleIntSketchAccumulatorTest.java             | 106 ++++++++
 14 files changed, 957 insertions(+), 290 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 14aa2d2f10..80483b2640 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -80,6 +80,7 @@ import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.query.utils.idset.IdSets;
 import org.apache.pinot.segment.local.customobject.AvgPair;
 import org.apache.pinot.segment.local.customobject.CovarianceTuple;
+import org.apache.pinot.segment.local.customobject.CpcSketchAccumulator;
 import org.apache.pinot.segment.local.customobject.DoubleLongPair;
 import org.apache.pinot.segment.local.customobject.FloatLongPair;
 import org.apache.pinot.segment.local.customobject.IntLongPair;
@@ -89,6 +90,7 @@ import 
org.apache.pinot.segment.local.customobject.PinotFourthMoment;
 import org.apache.pinot.segment.local.customobject.QuantileDigest;
 import org.apache.pinot.segment.local.customobject.StringLongPair;
 import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
+import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
 import org.apache.pinot.segment.local.customobject.VarianceTuple;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -156,7 +158,9 @@ public class ObjectSerDeUtils {
     FloatArrayList(44),
     StringArrayList(45),
     UltraLogLog(46),
-    ThetaSketchAccumulator(47);
+    ThetaSketchAccumulator(47),
+    TupleIntSketchAccumulator(48),
+    CpcSketchAccumulator(49);
 
     private final int _value;
 
@@ -277,6 +281,10 @@ public class ObjectSerDeUtils {
         return ObjectType.UltraLogLog;
       } else if (value instanceof ThetaSketchAccumulator) {
         return ObjectType.ThetaSketchAccumulator;
+      } else if (value instanceof TupleIntSketchAccumulator) {
+        return ObjectType.TupleIntSketchAccumulator;
+      } else if (value instanceof CpcSketchAccumulator) {
+        return ObjectType.CpcSketchAccumulator;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + 
value.getClass().getSimpleName());
       }
@@ -1587,7 +1595,7 @@ public class ObjectSerDeUtils {
     }
   };
 
-  public static final ObjectSerDe<ThetaSketchAccumulator> 
DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE =
+  public static final ObjectSerDe<ThetaSketchAccumulator> 
DATA_SKETCH_THETA_ACCUMULATOR_SER_DE =
       new ObjectSerDe<ThetaSketchAccumulator>() {
 
         @Override
@@ -1614,6 +1622,62 @@ public class ObjectSerDeUtils {
         }
       };
 
+  public static final ObjectSerDe<TupleIntSketchAccumulator> 
DATA_SKETCH_INT_TUPLE_ACCUMULATOR_SER_DE =
+      new ObjectSerDe<TupleIntSketchAccumulator>() {
+
+        @Override
+        public byte[] serialize(TupleIntSketchAccumulator 
tupleIntSketchBuffer) {
+          org.apache.datasketches.tuple.Sketch<IntegerSummary> sketch = 
tupleIntSketchBuffer.getResult();
+          return sketch.toByteArray();
+        }
+
+        @Override
+        public TupleIntSketchAccumulator deserialize(byte[] bytes) {
+          return deserialize(ByteBuffer.wrap(bytes));
+        }
+
+        // Note: The accumulator is designed to serialize as a sketch and 
should
+        // not be deserialized in practice.
+        @Override
+        public TupleIntSketchAccumulator deserialize(ByteBuffer byteBuffer) {
+          TupleIntSketchAccumulator tupleIntSketchAccumulator = new 
TupleIntSketchAccumulator();
+          byte[] bytes = new byte[byteBuffer.remaining()];
+          byteBuffer.get(bytes);
+          org.apache.datasketches.tuple.Sketch<IntegerSummary> sketch =
+              
org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+                  new IntegerSummaryDeserializer());
+          tupleIntSketchAccumulator.apply(sketch);
+          return tupleIntSketchAccumulator;
+        }
+      };
+
+  public static final ObjectSerDe<CpcSketchAccumulator> 
DATA_SKETCH_CPC_ACCUMULATOR_SER_DE =
+      new ObjectSerDe<CpcSketchAccumulator>() {
+
+        @Override
+        public byte[] serialize(CpcSketchAccumulator cpcSketchBuffer) {
+          CpcSketch sketch = cpcSketchBuffer.getResult();
+          return sketch.toByteArray();
+        }
+
+        @Override
+        public CpcSketchAccumulator deserialize(byte[] bytes) {
+          return deserialize(ByteBuffer.wrap(bytes));
+        }
+
+        // Note: The accumulator is designed to serialize as a sketch and 
should
+        // not be deserialized in practice.
+        @Override
+        public CpcSketchAccumulator deserialize(ByteBuffer byteBuffer) {
+          CpcSketchAccumulator cpcSketchAccumulator = new 
CpcSketchAccumulator();
+          byte[] bytes = new byte[byteBuffer.remaining()];
+          byteBuffer.get(bytes);
+          CpcSketch sketch = CpcSketch.heapify(Memory.wrap(bytes));
+          cpcSketchAccumulator.apply(sketch);
+          return cpcSketchAccumulator;
+        }
+      };
+
   // NOTE: DO NOT change the order, it has to be the same order as the 
ObjectType
   //@formatter:off
   private static final ObjectSerDe[] SER_DES = {
@@ -1664,7 +1728,9 @@ public class ObjectSerDeUtils {
       FLOAT_ARRAY_LIST_SER_DE,
       STRING_ARRAY_LIST_SER_DE,
       ULTRA_LOG_LOG_OBJECT_SER_DE,
-      DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE,
+      DATA_SKETCH_THETA_ACCUMULATOR_SER_DE,
+      DATA_SKETCH_INT_TUPLE_ACCUMULATOR_SER_DE,
+      DATA_SKETCH_CPC_ACCUMULATOR_SER_DE,
   };
   //@formatter:on
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java
index 3b3718dba2..16fd6751b2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java
@@ -19,12 +19,12 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import java.util.List;
-import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
 import org.apache.datasketches.tuple.TupleSketchIterator;
-import org.apache.datasketches.tuple.Union;
 import org.apache.datasketches.tuple.aninteger.IntegerSummary;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
 
@@ -48,22 +48,20 @@ public class AvgValueIntegerTupleSketchAggregationFunction
   }
 
   @Override
-  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> 
integerSummarySketches) {
-    if (integerSummarySketches == null) {
+  public Comparable extractFinalResult(TupleIntSketchAccumulator accumulator) {
+    accumulator.setNominalEntries(_nominalEntries);
+    accumulator.setSetOperations(_setOps);
+    accumulator.setThreshold(_accumulatorThreshold);
+    Sketch<IntegerSummary> result = accumulator.getResult();
+    if (result.isEmpty() || result.getRetainedEntries() == 0) {
+      // there is nothing to average, return null
       return null;
     }
-    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
-    integerSummarySketches.forEach(union::union);
-    double retainedTotal = 0L;
-    CompactSketch<IntegerSummary> result = union.getResult();
     TupleSketchIterator<IntegerSummary> summaries = result.iterator();
+    double retainedTotal = 0L;
     while (summaries.next()) {
       retainedTotal += summaries.getSummary().getValue();
     }
-    if (result.getRetainedEntries() == 0) {
-      // there is nothing to average, return null
-      return null;
-    }
     double estimate = retainedTotal / result.getRetainedEntries();
     return Math.round(estimate);
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
index 1946200842..4a33086bb8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
@@ -21,16 +21,17 @@ package org.apache.pinot.core.query.aggregation.function;
 import com.google.common.base.Preconditions;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.datasketches.cpc.CpcSketch;
-import org.apache.datasketches.cpc.CpcUnion;
+import org.apache.datasketches.memory.Memory;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.CpcSketchAccumulator;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -80,8 +81,10 @@ import org.roaringbitmap.RoaringBitmap;
  */
 @SuppressWarnings({"rawtypes"})
 public class DistinctCountCPCSketchAggregationFunction
-    extends BaseSingleInputAggregationFunction<CpcSketch, Comparable> {
-  protected final int _lgK;
+    extends BaseSingleInputAggregationFunction<CpcSketchAccumulator, 
Comparable> {
+  private static final int DEFAULT_ACCUMULATOR_THRESHOLD = 2;
+  protected int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
+  protected int _lgNominalEntries;
 
   public DistinctCountCPCSketchAggregationFunction(List<ExpressionContext> 
arguments) {
     super(arguments.get(0));
@@ -92,9 +95,22 @@ public class DistinctCountCPCSketchAggregationFunction
     Preconditions.checkArgument(numExpressions <= 2, "DistinctCountCPC expects 
1 or 2 arguments, got: %s",
         numExpressions);
     if (arguments.size() == 2) {
-      _lgK = arguments.get(1).getLiteral().getIntValue();
+      ExpressionContext secondArgument = arguments.get(1);
+      Preconditions.checkArgument(secondArgument.getType() == 
ExpressionContext.Type.LITERAL,
+          "CPC Sketch Aggregation Function expects the second argument to be a 
literal (parameters)," + " but got: ",
+          secondArgument.getType());
+
+      if (secondArgument.getLiteral().getType() == FieldSpec.DataType.STRING) {
+        Parameters parameters = new 
Parameters(secondArgument.getLiteral().getStringValue());
+        // Allows the user to trade-off memory usage for merge CPU; higher 
values use more memory
+        _accumulatorThreshold = parameters.getAccumulatorThreshold();
+        // Nominal entries controls sketch accuracy and size
+        _lgNominalEntries = parameters.getLgNominalEntries();
+      } else {
+        _lgNominalEntries = secondArgument.getLiteral().getIntValue();
+      }
     } else {
-      _lgK = CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK;
+      _lgNominalEntries = CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK;
     }
   }
 
@@ -123,15 +139,11 @@ public class DistinctCountCPCSketchAggregationFunction
     if (storedType == DataType.BYTES) {
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
       try {
-        CpcSketch cpcSketch = aggregationResultHolder.getResult();
-        CpcUnion union = new CpcUnion(_lgK);
-        if (cpcSketch != null) {
-          union.update(cpcSketch);
-        }
-        for (int i = 0; i < length; i++) {
-          
union.update(ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i]));
+        CpcSketchAccumulator cpcSketchAccumulator = 
getAccumulator(aggregationResultHolder);
+        CpcSketch[] sketches = deserializeSketches(bytesValues, length);
+        for (CpcSketch sketch : sketches) {
+          cpcSketchAccumulator.apply(sketch);
         }
-        aggregationResultHolder.setValue(union.getResult());
       } catch (Exception e) {
         throw new RuntimeException("Caught exception while merging CPC 
sketches", e);
       }
@@ -182,6 +194,8 @@ public class DistinctCountCPCSketchAggregationFunction
       default:
         throw new IllegalStateException("Illegal data type for 
DISTINCT_COUNT_CPC aggregation function: " + storedType);
     }
+    CpcSketchAccumulator cpcSketchAccumulator = 
getAccumulator(aggregationResultHolder);
+    cpcSketchAccumulator.apply(cpcSketch);
   }
 
   @Override
@@ -191,24 +205,17 @@ public class DistinctCountCPCSketchAggregationFunction
 
     // Treat BYTES value as serialized CPC Sketch
     DataType storedType = blockValSet.getValueType().getStoredType();
-    if (storedType == DataType.BYTES) {
+    if (storedType == FieldSpec.DataType.BYTES) {
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
       try {
+        CpcSketch[] sketches = deserializeSketches(bytesValues, length);
         for (int i = 0; i < length; i++) {
-          CpcSketch value = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i]);
-          int groupKey = groupKeyArray[i];
-          CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey);
-          if (cpcSketch != null) {
-            CpcUnion union = new CpcUnion(_lgK);
-            union.update(cpcSketch);
-            union.update(value);
-            groupByResultHolder.setValueForKey(groupKey, union.getResult());
-          } else {
-            groupByResultHolder.setValueForKey(groupKey, value);
-          }
+          CpcSketchAccumulator cpcSketchAccumulator = 
getAccumulator(groupByResultHolder, groupKeyArray[i]);
+          CpcSketch sketch = sketches[i];
+          cpcSketchAccumulator.apply(sketch);
         }
       } catch (Exception e) {
-        throw new RuntimeException("Caught exception while merging CPC 
sketches", e);
+        throw new RuntimeException("Caught exception while aggregating CPC 
Sketches", e);
       }
       return;
     }
@@ -267,25 +274,19 @@ public class DistinctCountCPCSketchAggregationFunction
 
     // Treat BYTES value as serialized CPC Sketch
     DataType storedType = blockValSet.getValueType().getStoredType();
-    if (storedType == DataType.BYTES) {
+    boolean singleValue = blockValSet.isSingleValue();
+
+    if (singleValue && storedType == DataType.BYTES) {
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
       try {
+        CpcSketch[] sketches = deserializeSketches(bytesValues, length);
         for (int i = 0; i < length; i++) {
-          CpcSketch value = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i]);
           for (int groupKey : groupKeysArray[i]) {
-            CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey);
-            if (cpcSketch != null) {
-              CpcUnion union = new CpcUnion(_lgK);
-              union.update(cpcSketch);
-              union.update(value);
-              groupByResultHolder.setValueForKey(groupKey, union.getResult());
-            } else {
-              groupByResultHolder.setValueForKey(groupKey, value);
-            }
+            getAccumulator(groupByResultHolder, groupKey).apply(sketches[i]);
           }
         }
       } catch (Exception e) {
-        throw new RuntimeException("Caught exception while merging CPC 
sketches", e);
+        throw new RuntimeException("Caught exception while aggregating CPC 
sketches", e);
       }
       return;
     }
@@ -348,51 +349,50 @@ public class DistinctCountCPCSketchAggregationFunction
   }
 
   @Override
-  public CpcSketch extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+  public CpcSketchAccumulator extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
     Object result = aggregationResultHolder.getResult();
     if (result == null) {
-      return new CpcSketch(_lgK);
+      return new CpcSketchAccumulator(_lgNominalEntries, 
_accumulatorThreshold);
     }
 
-    if (result instanceof DictIdsWrapper) {
+    if (result instanceof CpcSketch) {
+      return convertSketchAccumulator(result);
+    } else if (result instanceof DictIdsWrapper) {
       // For dictionary-encoded expression, convert dictionary ids to CpcSketch
-      return convertToCpcSketch((DictIdsWrapper) result);
+      return convertSketchAccumulator(dictionaryToCpcSketch((DictIdsWrapper) 
result));
     } else {
-      // For non-dictionary-encoded expression, directly return the CpcSketch
-      return (CpcSketch) result;
+      return (CpcSketchAccumulator) result;
     }
   }
 
   @Override
-  public CpcSketch extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+  public CpcSketchAccumulator extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
     Object result = groupByResultHolder.getResult(groupKey);
     if (result == null) {
-      return new CpcSketch(_lgK);
+      return new CpcSketchAccumulator(_lgNominalEntries, 
_accumulatorThreshold);
     }
 
-    if (result instanceof DictIdsWrapper) {
+    if (result instanceof CpcSketch) {
+      return convertSketchAccumulator(result);
+    } else if (result instanceof DictIdsWrapper) {
       // For dictionary-encoded expression, convert dictionary ids to CpcSketch
-      return convertToCpcSketch((DictIdsWrapper) result);
+      return convertSketchAccumulator(dictionaryToCpcSketch((DictIdsWrapper) 
result));
     } else {
-      // For non-dictionary-encoded expression, directly return the CpcSketch
-      return (CpcSketch) result;
+      return (CpcSketchAccumulator) result;
     }
   }
 
   @Override
-  public CpcSketch merge(CpcSketch intermediateResult1, CpcSketch 
intermediateResult2) {
-    if (intermediateResult1 == null && intermediateResult2 != null) {
+  public CpcSketchAccumulator merge(CpcSketchAccumulator intermediateResult1,
+      CpcSketchAccumulator intermediateResult2) {
+    if (intermediateResult1 == null || intermediateResult1.isEmpty()) {
       return intermediateResult2;
-    } else if (intermediateResult1 != null && intermediateResult2 == null) {
+    }
+    if (intermediateResult2 == null || intermediateResult2.isEmpty()) {
       return intermediateResult1;
-    } else if (intermediateResult1 == null) {
-      return new CpcSketch(_lgK);
     }
-
-    CpcUnion union = new CpcUnion(_lgK);
-    union.update(intermediateResult1);
-    union.update(intermediateResult2);
-    return union.getResult();
+    intermediateResult1.merge(intermediateResult2);
+    return intermediateResult1;
   }
 
   @Override
@@ -406,8 +406,22 @@ public class DistinctCountCPCSketchAggregationFunction
   }
 
   @Override
-  public Comparable extractFinalResult(CpcSketch intermediateResult) {
-    return Math.round(intermediateResult.getEstimate());
+  public Comparable extractFinalResult(CpcSketchAccumulator 
intermediateResult) {
+    intermediateResult.setLgNominalEntries(_lgNominalEntries);
+    intermediateResult.setThreshold(_accumulatorThreshold);
+    return Math.round(intermediateResult.getResult().getEstimate());
+  }
+
+  /**
+   * Returns the CpcSketch from the result holder or creates a new one if it 
does not exist.
+   */
+  protected CpcSketch getCpcSketch(AggregationResultHolder 
aggregationResultHolder) {
+    CpcSketch cpcSketch = aggregationResultHolder.getResult();
+    if (cpcSketch == null) {
+      cpcSketch = new CpcSketch(_lgNominalEntries);
+      aggregationResultHolder.setValue(cpcSketch);
+    }
+    return cpcSketch;
   }
 
   /**
@@ -423,18 +437,6 @@ public class DistinctCountCPCSketchAggregationFunction
     return dictIdsWrapper._dictIdBitmap;
   }
 
-  /**
-   * Returns the CpcSketch from the result holder or creates a new one if it 
does not exist.
-   */
-  protected CpcSketch getCpcSketch(AggregationResultHolder 
aggregationResultHolder) {
-    CpcSketch cpcSketch = aggregationResultHolder.getResult();
-    if (cpcSketch == null) {
-      cpcSketch = new CpcSketch(_lgK);
-      aggregationResultHolder.setValue(cpcSketch);
-    }
-    return cpcSketch;
-  }
-
   /**
    * Returns the dictionary id bitmap for the given group key or creates a new 
one if it does not exist.
    */
@@ -454,7 +456,7 @@ public class DistinctCountCPCSketchAggregationFunction
   protected CpcSketch getCpcSketch(GroupByResultHolder groupByResultHolder, 
int groupKey) {
     CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey);
     if (cpcSketch == null) {
-      cpcSketch = new CpcSketch(_lgK);
+      cpcSketch = new CpcSketch(_lgNominalEntries);
       groupByResultHolder.setValueForKey(groupKey, cpcSketch);
     }
     return cpcSketch;
@@ -470,8 +472,8 @@ public class DistinctCountCPCSketchAggregationFunction
     }
   }
 
-  private CpcSketch convertToCpcSketch(DictIdsWrapper dictIdsWrapper) {
-    CpcSketch cpcSketch = new CpcSketch(_lgK);
+  private CpcSketch dictionaryToCpcSketch(DictIdsWrapper dictIdsWrapper) {
+    CpcSketch cpcSketch = new CpcSketch(_lgNominalEntries);
     Dictionary dictionary = dictIdsWrapper._dictionary;
     RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
     PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
@@ -528,6 +530,56 @@ public class DistinctCountCPCSketchAggregationFunction
     }
   }
 
+  /**
+   * Returns the accumulator from the result holder or creates a new one if it 
does not exist.
+   */
+  private CpcSketchAccumulator getAccumulator(AggregationResultHolder 
aggregationResultHolder) {
+    CpcSketchAccumulator accumulator = aggregationResultHolder.getResult();
+    if (accumulator == null) {
+      accumulator = new CpcSketchAccumulator(_lgNominalEntries, 
_accumulatorThreshold);
+      aggregationResultHolder.setValue(accumulator);
+    }
+    return accumulator;
+  }
+
+  /**
+   * Returns the accumulator for the given group key or creates a new one if 
it does not exist.
+   */
+  private CpcSketchAccumulator getAccumulator(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    CpcSketchAccumulator accumulator = groupByResultHolder.getResult(groupKey);
+    if (accumulator == null) {
+      accumulator = new CpcSketchAccumulator(_lgNominalEntries, 
_accumulatorThreshold);
+      groupByResultHolder.setValueForKey(groupKey, accumulator);
+    }
+    return accumulator;
+  }
+
+  /**
+   * Deserializes the sketches from the bytes.
+   */
+  @SuppressWarnings({"unchecked"})
+  private CpcSketch[] deserializeSketches(byte[][] serializedSketches, int 
length) {
+    CpcSketch[] sketches = new CpcSketch[length];
+    for (int i = 0; i < length; i++) {
+      sketches[i] = CpcSketch.heapify(Memory.wrap(serializedSketches[i]));
+    }
+    return sketches;
+  }
+
+  // This ensures backward compatibility with servers that still return 
sketches directly.
+  // The AggregationDataTableReducer casts intermediate results to Objects and 
although the code compiles,
+  // types might still be incompatible at runtime due to type erasure.
+  // Due to performance overheads of redundant casts, this should be removed 
at some future point.
+  protected CpcSketchAccumulator convertSketchAccumulator(Object result) {
+    if (result instanceof CpcSketch) {
+      CpcSketch sketch = (CpcSketch) result;
+      CpcSketchAccumulator accumulator = new 
CpcSketchAccumulator(_lgNominalEntries, _accumulatorThreshold);
+      accumulator.apply(sketch);
+      return accumulator;
+    }
+    return (CpcSketchAccumulator) result;
+  }
+
   private static final class DictIdsWrapper {
     final Dictionary _dictionary;
     final RoaringBitmap _dictIdBitmap;
@@ -537,4 +589,44 @@ public class DistinctCountCPCSketchAggregationFunction
       _dictIdBitmap = new RoaringBitmap();
     }
   }
+
+  /**
+   * Helper class to wrap the CpcSketch parameters.  The initial values for 
the parameters are set to the
+   * same defaults in the Apache Datasketches library.
+   */
+  private static class Parameters {
+    private static final char PARAMETER_DELIMITER = ';';
+    private static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
+    private static final String NOMINAL_ENTRIES_KEY = "nominalEntries";
+    private static final String ACCUMULATOR_THRESHOLD_KEY = 
"accumulatorThreshold";
+
+    private int _nominalEntries = (int) Math.pow(2, 
CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
+    private int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
+
+    Parameters(String parametersString) {
+      StringUtils.deleteWhitespace(parametersString);
+      String[] keyValuePairs = StringUtils.split(parametersString, 
PARAMETER_DELIMITER);
+      for (String keyValuePair : keyValuePairs) {
+        String[] keyAndValue = StringUtils.split(keyValuePair, 
PARAMETER_KEY_VALUE_SEPARATOR);
+        Preconditions.checkArgument(keyAndValue.length == 2, "Invalid 
parameter: %s", keyValuePair);
+        String key = keyAndValue[0];
+        String value = keyAndValue[1];
+        if (key.equalsIgnoreCase(NOMINAL_ENTRIES_KEY)) {
+          _nominalEntries = Integer.parseInt(value);
+        } else if (key.equalsIgnoreCase(ACCUMULATOR_THRESHOLD_KEY)) {
+          _accumulatorThreshold = Integer.parseInt(value);
+        } else {
+          throw new IllegalArgumentException("Invalid parameter key: " + key);
+        }
+      }
+    }
+
+    int getLgNominalEntries() {
+      return 
org.apache.datasketches.common.Util.exactLog2OfInt(_nominalEntries);
+    }
+
+    int getAccumulatorThreshold() {
+      return _accumulatorThreshold;
+    }
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java
index 087337472d..68ec18e401 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java
@@ -19,11 +19,10 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import java.util.List;
-import org.apache.datasketches.tuple.CompactSketch;
-import org.apache.datasketches.tuple.Union;
 import org.apache.datasketches.tuple.aninteger.IntegerSummary;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
 
@@ -46,9 +45,10 @@ public class 
DistinctCountIntegerTupleSketchAggregationFunction extends IntegerT
   }
 
   @Override
-  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> 
integerSummarySketches) {
-    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
-    integerSummarySketches.forEach(union::union);
-    return Double.valueOf(union.getResult().getEstimate()).longValue();
+  public Comparable extractFinalResult(TupleIntSketchAccumulator accumulator) {
+    accumulator.setNominalEntries(_nominalEntries);
+    accumulator.setSetOperations(_setOps);
+    accumulator.setThreshold(_accumulatorThreshold);
+    return Double.valueOf(accumulator.getResult().getEstimate()).longValue();
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java
index ab153c8835..ff3a587881 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java
@@ -19,9 +19,9 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import java.util.List;
-import org.apache.datasketches.cpc.CpcSketch;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.CpcSketchAccumulator;
 import org.apache.pinot.segment.local.customobject.SerializedCPCSketch;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
@@ -47,7 +47,9 @@ public class DistinctCountRawCPCSketchAggregationFunction 
extends DistinctCountC
   }
 
   @Override
-  public SerializedCPCSketch extractFinalResult(CpcSketch sketch) {
-    return new SerializedCPCSketch(sketch);
+  public SerializedCPCSketch extractFinalResult(CpcSketchAccumulator 
intermediateResult) {
+    intermediateResult.setLgNominalEntries(_lgNominalEntries);
+    intermediateResult.setThreshold(_accumulatorThreshold);
+    return new SerializedCPCSketch(intermediateResult.getResult());
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
index 1fdace955c..992ef5d7a1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
@@ -19,41 +19,93 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Base64;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.tuple.Sketch;
-import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.Sketches;
 import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummaryDeserializer;
 import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.utils.CommonConstants;
 
 
-/***
- * This is the base class for all Integer Tuple Sketch aggregations
+/**
+ * The {@code IntegerTupleSketchAggregationFunction} is the base class for all 
integer-based Tuple Sketch aggregations.
+ * Apache Datasketches Tuple Sketches are an extension of the Apache 
Datasketches Theta Sketch. Tuple sketches store an
+ * additional summary value with each retained entry which makes the sketch 
ideal for summarizing attributes
+ * such as impressions or clicks.
+ *
+ * Tuple sketches are interoperable with the Theta Sketch and enable set 
operations over a stream of data, and can
+ * also be used for cardinality estimation.
+ *
+ * Note: The current implementation of this aggregation function is limited to 
binary columns that contain sketches
+ * built outside of Pinot.
  *
- * Note that it only supports BYTES columns containing serialized sketches 
currently, but could be expanded to more
+ * Usage examples:
+ * <ul>
+ *   <li>
+ *     Simple union (1 or 2 arguments): main expression to aggregate on, 
followed by an optional Tuple sketch size
+ *     argument. The second argument is the sketch lgK – the given log_base2 
of k, and defaults to 16.
+ *     The "raw" equivalents return serialised sketches in base64-encoded 
strings.
+ *     <p>DISTINCT_COUNT_TUPLE_SKETCH(col)</p>
+ *     <p>DISTINCT_COUNT_TUPLE_SKETCH(col, 12)</p>
+ *     <p>DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col)</p>
+ *     <p>DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col, 12)</p>
+ *   <li>
+ *     Extracting a cardinality estimate from a CPC sketch:
+ *     <p>GET_INT_TUPLE_SKETCH_ESTIMATE(sketch_bytes)</p>
+ *     
<p>GET_INT_TUPLE_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_TUPLE_SKETCH(col))</p>
+ *   </li>
+ *   <li>
+ *     Union between two sketches summaries are merged using addition for hash 
keys in common:
+ *     <p>
+ *       INT_SUM_TUPLE_SKETCH_UNION(
+ *         DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col1),
+ *         DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col2)
+ *       )
+ *     </p>
+ *   </li>
+ *   <li>
+ *     Union between two sketches summaries are merged using maximum for hash 
keys in common:
+ *     <p>
+ *       INT_MAX_TUPLE_SKETCH_UNION(
+ *         DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col1),
+ *         DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col2)
+ *       )
+ *     </p>
+ *   </li>
+ *   <li>
+ *     Union between two sketches summaries are merged using minimum for hash 
keys in common:
+ *     <p>
+ *       INT_MIN_TUPLE_SKETCH_UNION(
+ *         DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col1),
+ *         DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col2)
+ *       )
+ *     </p>
+ *  </li>
+ * </ul>
  */
+@SuppressWarnings({"rawtypes"})
 public class IntegerTupleSketchAggregationFunction
-    extends 
BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, 
Comparable> {
+    extends BaseSingleInputAggregationFunction<TupleIntSketchAccumulator, 
Comparable> {
+  private static final int DEFAULT_ACCUMULATOR_THRESHOLD = 2;
   final ExpressionContext _expressionContext;
   final IntegerSummarySetOperations _setOps;
-  final int _entries;
+  protected int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
+  protected int _nominalEntries;
 
   public IntegerTupleSketchAggregationFunction(List<ExpressionContext> 
arguments, IntegerSummary.Mode mode) {
     super(arguments.get(0));
@@ -65,11 +117,20 @@ public class IntegerTupleSketchAggregationFunction
     if (arguments.size() == 2) {
       ExpressionContext secondArgument = arguments.get(1);
       Preconditions.checkArgument(secondArgument.getType() == 
ExpressionContext.Type.LITERAL,
-          "Tuple Sketch Aggregation Function expects the second argument to be 
a literal (number of entries to keep),"
-              + " but got: ", secondArgument.getType());
-      _entries = secondArgument.getLiteral().getIntValue();
+          "Tuple Sketch Aggregation Function expects the second argument to be 
a literal (parameters)," + " but got: ",
+          secondArgument.getType());
+
+      if (secondArgument.getLiteral().getType() == FieldSpec.DataType.STRING) {
+        Parameters parameters = new 
Parameters(secondArgument.getLiteral().getStringValue());
+        // Allows the user to trade-off memory usage for merge CPU; higher 
values use more memory
+        _accumulatorThreshold = parameters.getAccumulatorThreshold();
+        // Nominal entries controls sketch accuracy and size
+        _nominalEntries = parameters.getNominalEntries();
+      } else {
+        _nominalEntries = secondArgument.getLiteral().getIntValue();
+      }
     } else {
-      _entries = (int) Math.pow(2, 
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+      _nominalEntries = (int) Math.pow(2, 
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
     }
   }
 
@@ -99,20 +160,13 @@ public class IntegerTupleSketchAggregationFunction
     if (storedType == FieldSpec.DataType.BYTES) {
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
       try {
-        List<CompactSketch<IntegerSummary>> integerSketch = 
aggregationResultHolder.getResult();
-        if (integerSketch != null) {
-          List<CompactSketch<IntegerSummary>> sketches =
-              
Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
-                  .map(Sketch::compact).collect(Collectors.toList());
-          
aggregationResultHolder.setValue(merge(aggregationResultHolder.getResult(), 
sketches));
-        } else {
-          List<CompactSketch<IntegerSummary>> sketches =
-              
Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
-                  .map(Sketch::compact).collect(Collectors.toList());
-          aggregationResultHolder.setValue(sketches);
+        TupleIntSketchAccumulator tupleIntSketchAccumulator = 
getAccumulator(aggregationResultHolder);
+        Sketch<IntegerSummary>[] sketches = deserializeSketches(bytesValues, 
length);
+        for (Sketch<IntegerSummary> sketch : sketches) {
+          tupleIntSketchAccumulator.apply(sketch);
         }
       } catch (Exception e) {
-        throw new RuntimeException("Caught exception while merging Tuple 
Sketches", e);
+        throw new RuntimeException("Caught exception while aggregating Tuple 
Sketches", e);
       }
     } else {
       throw new IllegalStateException("Illegal data type for " + getType() + " 
aggregation function: " + storedType);
@@ -131,21 +185,14 @@ public class IntegerTupleSketchAggregationFunction
     if (storedType == FieldSpec.DataType.BYTES) {
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
       try {
+        Sketch<IntegerSummary>[] sketches = deserializeSketches(bytesValues, 
length);
         for (int i = 0; i < length; i++) {
-          byte[] value = bytesValues[i];
-          int groupKey = groupKeyArray[i];
-          CompactSketch<IntegerSummary> newSketch =
-              
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
-          if (groupByResultHolder.getResult(groupKey) == null) {
-            ArrayList<CompactSketch<IntegerSummary>> newList = new 
ArrayList<>();
-            newList.add(newSketch);
-            groupByResultHolder.setValueForKey(groupKey, newList);
-          } else {
-            
groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
-          }
+          TupleIntSketchAccumulator tupleIntSketchAccumulator = 
getAccumulator(groupByResultHolder, groupKeyArray[i]);
+          Sketch<IntegerSummary> sketch = sketches[i];
+          tupleIntSketchAccumulator.apply(sketch);
         }
       } catch (Exception e) {
-        throw new RuntimeException("Caught exception while merging Tuple 
Sketches", e);
+        throw new RuntimeException("Caught exception while aggregating Tuple 
Sketches", e);
       }
     } else {
       throw new IllegalStateException(
@@ -156,47 +203,55 @@ public class IntegerTupleSketchAggregationFunction
   @Override
   public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    byte[][] valueArray = blockValSetMap.get(_expression).getBytesValuesSV();
-    for (int i = 0; i < length; i++) {
-      byte[] value = valueArray[i];
-      CompactSketch<IntegerSummary> newSketch =
-          
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
-      for (int groupKey : groupKeysArray[i]) {
-        if (groupByResultHolder.getResult(groupKey) == null) {
-          groupByResultHolder.setValueForKey(groupKey, 
Collections.singletonList(newSketch));
-        } else {
-          
groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized Integer Tuple Sketch
+    FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+    boolean singleValue = blockValSet.isSingleValue();
+
+    if (singleValue && storedType == FieldSpec.DataType.BYTES) {
+      byte[][] bytesValues = 
blockValSetMap.get(_expression).getBytesValuesSV();
+      try {
+        Sketch<IntegerSummary>[] sketches = deserializeSketches(bytesValues, 
length);
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            getAccumulator(groupByResultHolder, groupKey).apply(sketches[i]);
+          }
         }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while aggregating Tuple 
Sketches", e);
       }
+    } else {
+      throw new IllegalStateException(
+          "Illegal data type for INTEGER_TUPLE_SKETCH_UNION aggregation 
function: " + storedType);
     }
   }
 
   @Override
-  public List<CompactSketch<IntegerSummary>> 
extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
-    return aggregationResultHolder.getResult();
+  public TupleIntSketchAccumulator 
extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    TupleIntSketchAccumulator result = aggregationResultHolder.getResult();
+    if (result == null) {
+      return new TupleIntSketchAccumulator(_setOps, _nominalEntries, 
_accumulatorThreshold);
+    }
+    return result;
   }
 
   @Override
-  public List<CompactSketch<IntegerSummary>> 
extractGroupByResult(GroupByResultHolder groupByResultHolder,
-      int groupKey) {
+  public TupleIntSketchAccumulator extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
     return groupByResultHolder.getResult(groupKey);
   }
 
   @Override
-  public List<CompactSketch<IntegerSummary>> 
merge(List<CompactSketch<IntegerSummary>> intermediateResult1,
-      List<CompactSketch<IntegerSummary>> intermediateResult2) {
-    if (intermediateResult1 == null && intermediateResult2 != null) {
+  public TupleIntSketchAccumulator merge(TupleIntSketchAccumulator 
intermediateResult1,
+      TupleIntSketchAccumulator intermediateResult2) {
+    if (intermediateResult1 == null || intermediateResult1.isEmpty()) {
       return intermediateResult2;
-    } else if (intermediateResult1 != null && intermediateResult2 == null) {
+    }
+    if (intermediateResult2 == null || intermediateResult2.isEmpty()) {
       return intermediateResult1;
-    } else if (intermediateResult1 == null && intermediateResult2 == null) {
-      return new ArrayList<>(0);
     }
-    ArrayList<CompactSketch<IntegerSummary>> merged =
-        new ArrayList<>(intermediateResult1.size() + 
intermediateResult2.size());
-    merged.addAll(intermediateResult1);
-    merged.addAll(intermediateResult2);
-    return merged;
+    intermediateResult1.merge(intermediateResult2);
+    return intermediateResult1;
   }
 
   @Override
@@ -210,12 +265,86 @@ public class IntegerTupleSketchAggregationFunction
   }
 
   @Override
-  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> 
integerSummarySketches) {
-    if (integerSummarySketches == null) {
-      return null;
+  public Comparable extractFinalResult(TupleIntSketchAccumulator accumulator) {
+    accumulator.setNominalEntries(_nominalEntries);
+    accumulator.setSetOperations(_setOps);
+    accumulator.setThreshold(_accumulatorThreshold);
+    return 
Base64.getEncoder().encodeToString(accumulator.getResult().toByteArray());
+  }
+
+  /**
+   * Returns the accumulator from the result holder or creates a new one if it 
does not exist.
+   */
+  private TupleIntSketchAccumulator getAccumulator(AggregationResultHolder 
aggregationResultHolder) {
+    TupleIntSketchAccumulator accumulator = 
aggregationResultHolder.getResult();
+    if (accumulator == null) {
+      accumulator = new TupleIntSketchAccumulator(_setOps, _nominalEntries, 
_accumulatorThreshold);
+      aggregationResultHolder.setValue(accumulator);
+    }
+    return accumulator;
+  }
+
+  /**
+   * Returns the accumulator for the given group key or creates a new one if 
it does not exist.
+   */
+  private TupleIntSketchAccumulator getAccumulator(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    TupleIntSketchAccumulator accumulator = 
groupByResultHolder.getResult(groupKey);
+    if (accumulator == null) {
+      accumulator = new TupleIntSketchAccumulator(_setOps, _nominalEntries, 
_accumulatorThreshold);
+      groupByResultHolder.setValueForKey(groupKey, accumulator);
+    }
+    return accumulator;
+  }
+
+  /**
+   * Deserializes the sketches from the bytes.
+   */
+  @SuppressWarnings({"unchecked"})
+  private Sketch<IntegerSummary>[] deserializeSketches(byte[][] 
serializedSketches, int length) {
+    Sketch<IntegerSummary>[] sketches = new Sketch[length];
+    for (int i = 0; i < length; i++) {
+      sketches[i] = Sketches.heapifySketch(Memory.wrap(serializedSketches[i]), 
new IntegerSummaryDeserializer());
+    }
+    return sketches;
+  }
+
+  /**
+   * Helper class to wrap the tuple-sketch parameters.  The initial values for 
the parameters are set to the
+   * same defaults in the Apache Datasketches library.
+   */
+  private static class Parameters {
+    private static final char PARAMETER_DELIMITER = ';';
+    private static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
+    private static final String NOMINAL_ENTRIES_KEY = "nominalEntries";
+    private static final String ACCUMULATOR_THRESHOLD_KEY = 
"accumulatorThreshold";
+
+    private int _nominalEntries = (int) Math.pow(2, 
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+    private int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
+
+    Parameters(String parametersString) {
+      StringUtils.deleteWhitespace(parametersString);
+      String[] keyValuePairs = StringUtils.split(parametersString, 
PARAMETER_DELIMITER);
+      for (String keyValuePair : keyValuePairs) {
+        String[] keyAndValue = StringUtils.split(keyValuePair, 
PARAMETER_KEY_VALUE_SEPARATOR);
+        Preconditions.checkArgument(keyAndValue.length == 2, "Invalid 
parameter: %s", keyValuePair);
+        String key = keyAndValue[0];
+        String value = keyAndValue[1];
+        if (key.equalsIgnoreCase(NOMINAL_ENTRIES_KEY)) {
+          _nominalEntries = Integer.parseInt(value);
+        } else if (key.equalsIgnoreCase(ACCUMULATOR_THRESHOLD_KEY)) {
+          _accumulatorThreshold = Integer.parseInt(value);
+        } else {
+          throw new IllegalArgumentException("Invalid parameter key: " + key);
+        }
+      }
+    }
+
+    int getNominalEntries() {
+      return _nominalEntries;
+    }
+
+    int getAccumulatorThreshold() {
+      return _accumulatorThreshold;
     }
-    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
-    integerSummarySketches.forEach(union::union);
-    return Base64.getEncoder().encodeToString(union.getResult().toByteArray());
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java
index 33f746a1da..d37854b1b0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java
@@ -19,12 +19,12 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import java.util.List;
-import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
 import org.apache.datasketches.tuple.TupleSketchIterator;
-import org.apache.datasketches.tuple.Union;
 import org.apache.datasketches.tuple.aninteger.IntegerSummary;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
 
@@ -46,14 +46,12 @@ public class SumValuesIntegerTupleSketchAggregationFunction 
extends IntegerTuple
   }
 
   @Override
-  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> 
integerSummarySketches) {
-    if (integerSummarySketches == null) {
-      return null;
-    }
-    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
-    integerSummarySketches.forEach(union::union);
+  public Comparable extractFinalResult(TupleIntSketchAccumulator accumulator) {
     double retainedTotal = 0L;
-    CompactSketch<IntegerSummary> result = union.getResult();
+    accumulator.setNominalEntries(_nominalEntries);
+    accumulator.setSetOperations(_setOps);
+    accumulator.setThreshold(_accumulatorThreshold);
+    Sketch<IntegerSummary> result = accumulator.getResult();
     TupleSketchIterator<IntegerSummary> summaries = result.iterator();
     while (summaries.next()) {
       retainedTotal += summaries.getSummary().getValue();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index 01c39b0105..b397bd151c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -43,9 +43,13 @@ import org.apache.datasketches.theta.SetOperationBuilder;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Sketches;
 import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
 import 
org.apache.pinot.core.query.aggregation.function.PercentileEstAggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
 import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.local.customobject.CpcSketchAccumulator;
 import org.apache.pinot.segment.local.customobject.DoubleLongPair;
 import org.apache.pinot.segment.local.customobject.FloatLongPair;
 import org.apache.pinot.segment.local.customobject.IntLongPair;
@@ -54,6 +58,7 @@ import 
org.apache.pinot.segment.local.customobject.MinMaxRangePair;
 import org.apache.pinot.segment.local.customobject.QuantileDigest;
 import org.apache.pinot.segment.local.customobject.StringLongPair;
 import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
+import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
 import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
 import org.testng.annotations.Test;
@@ -522,4 +527,53 @@ public class ObjectSerDeUtilsTest {
       assertEquals(actual.getResult().toByteArray(), sketch.toByteArray(), 
ERROR_MESSAGE);
     }
   }
+
+  @Test
+  public void testTupleIntSketchAccumulator() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      int lgK = 4;
+      int size = RANDOM.nextInt(100) + 10;
+      IntegerSketch input = new IntegerSketch(lgK, IntegerSummary.Mode.Sum);
+
+      for (int j = 0; j < size; j++) {
+        input.update(j, RANDOM.nextInt(100));
+      }
+
+      IntegerSummarySetOperations setOps =
+          new IntegerSummarySetOperations(IntegerSummary.Mode.Sum, 
IntegerSummary.Mode.Sum);
+      TupleIntSketchAccumulator accumulator = new 
TupleIntSketchAccumulator(setOps, (int) Math.pow(2, lgK), 2);
+      org.apache.datasketches.tuple.Sketch<IntegerSummary> sketch = 
input.compact();
+      accumulator.apply(sketch);
+
+      byte[] bytes = ObjectSerDeUtils.serialize(accumulator);
+      TupleIntSketchAccumulator actual =
+          ObjectSerDeUtils.deserialize(bytes, 
ObjectSerDeUtils.ObjectType.TupleIntSketchAccumulator);
+
+      assertEquals(actual.getResult().getEstimate(), sketch.getEstimate(), 
ERROR_MESSAGE);
+      assertEquals(actual.getResult().toByteArray(), sketch.toByteArray(), 
ERROR_MESSAGE);
+    }
+  }
+
+  @Test
+  public void testCpcSketchAccumulator() {
+    int lgK = 4;
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      int size = RANDOM.nextInt(100) + 10;
+      CpcSketch sketch = new CpcSketch(lgK);
+
+      for (int j = 0; j < size; j++) {
+        sketch.update(j);
+      }
+
+      CpcSketchAccumulator accumulator = new CpcSketchAccumulator(lgK, 2);
+      accumulator.apply(sketch);
+
+      byte[] bytes = ObjectSerDeUtils.serialize(accumulator);
+      CpcSketchAccumulator actual =
+          ObjectSerDeUtils.deserialize(bytes, 
ObjectSerDeUtils.ObjectType.CpcSketchAccumulator);
+
+      assertEquals(actual.getResult().getEstimate(), sketch.getEstimate(), 
ERROR_MESSAGE);
+      assertEquals(actual.getResult().toByteArray(), sketch.toByteArray(), 
ERROR_MESSAGE);
+    }
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulator.java
new file mode 100644
index 0000000000..7d24da87cd
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulator.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import javax.annotation.Nonnull;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.cpc.CpcUnion;
+
+
+/**
+ * Intermediate state used by {@code 
DistinctCountCPCSketchAggregationFunction} which gives
+ * the end user more control over how sketches are merged for performance.
+ * The end user can set parameters that trade-off more memory usage for more 
pre-aggregation.
+ */
+public class CpcSketchAccumulator extends CustomObjectAccumulator<CpcSketch> {
+  private int _lgNominalEntries = 4;
+  private CpcUnion _union;
+
+  public CpcSketchAccumulator() {
+  }
+
+  // Note: The accumulator is serialized as a sketch.  This means that the 
majority of the processing
+  // happens on serialization. Therefore, when deserialized, the values may be 
null and will
+  // require re-initialisation. Since the primary use case is at query time 
for the Broker
+  // and Server, these properties are already in memory and are re-set.
+  public CpcSketchAccumulator(int lgNominalEntries, int threshold) {
+    super(threshold);
+    _lgNominalEntries = lgNominalEntries;
+  }
+
+  public void setLgNominalEntries(int lgNominalEntries) {
+    _lgNominalEntries = lgNominalEntries;
+  }
+
+  @Nonnull
+  @Override
+  public CpcSketch getResult() {
+    return unionAll();
+  }
+
+  private CpcSketch unionAll() {
+    if (_union == null) {
+      _union = new CpcUnion(_lgNominalEntries);
+    }
+    // Return the default update "gadget" sketch as a compact sketch
+    if (isEmpty()) {
+      return _union.getResult();
+    }
+    // Corner-case: the parameters are not strictly respected when there is a 
single sketch.
+    // This single sketch might have been the result of a previously 
accumulated union and
+    // would already have the parameters set.  The sketch is returned as-is 
without adjusting
+    // nominal entries which requires an additional union operation.
+    if (getNumInputs() == 1) {
+      return _accumulator.get(0);
+    }
+
+    for (CpcSketch accumulatedSketch : _accumulator) {
+      _union.update(accumulatedSketch);
+    }
+
+    return _union.getResult();
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CustomObjectAccumulator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CustomObjectAccumulator.java
new file mode 100644
index 0000000000..3e90695bec
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CustomObjectAccumulator.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import javax.annotation.Nonnull;
+
+
+/**
+ * Intermediate state used by some aggregation functions which gives the end 
user more control over how custom objects
+ * are merged for performance reasons.  Some custom objects such as 
DataSketches have better merge performance when
+ * more than two items are merged at once through the elimination of 
intermediate bookkeeping overheads.
+ *
+ * The end user can set a value for the "threshold" parameter that defers the 
merge operation until at least as many
+ * items are ready to be merged, or the callee forces the merge directly via 
"getResult" - e.g. on serialization.
+ * This data structure trades-off more memory usage for a greater degree of 
pre-aggregation in the accumulator state.
+ */
+public abstract class CustomObjectAccumulator<T> {
+  protected ArrayList<T> _accumulator;
+  private int _threshold;
+  private int _numInputs = 0;
+
+  public CustomObjectAccumulator() {
+    this(2);
+  }
+
+  public CustomObjectAccumulator(int threshold) {
+    setThreshold(threshold);
+  }
+
+  /**
+   * Sets the threshold that determines how much memory to use for the 
internal accumulator state before
+   * the intermediate state is merged.
+   * @param threshold the threshold [> 0].
+   */
+  public void setThreshold(int threshold) {
+    Preconditions.checkArgument(threshold > 0, "Invalid threshold: %s, must be 
positive", threshold);
+    _threshold = threshold;
+  }
+
+  /**
+   * Returns the configured threshold for this accumulator.
+   */
+  public int getThreshold() {
+    return _threshold;
+  }
+
+  /**
+   * Returns the number of inputs that have been added to the accumulator 
state.
+   */
+  public int getNumInputs() {
+    return _numInputs;
+  }
+
+  /**
+   * Returns true if no inputs have been added to the accumulator state.
+   */
+  public boolean isEmpty() {
+    return _numInputs == 0;
+  }
+
+  @Nonnull
+  /**
+   * Forces the item T in internal state to be merged with all pending items 
in the accumulator state
+   * and returns the result.  This should not result in the accumulator state 
being updated or cleared.
+   * @return T result of the merge.
+   */
+  public abstract T getResult();
+
+  /**
+   * Merges another accumulator with this one, by extracting the result from 
"other".
+   * @param other the custom object accumulator to merge.
+   */
+  public void merge(CustomObjectAccumulator<T> other) {
+    if (other.isEmpty()) {
+      return;
+    }
+    T result = other.getResult();
+    applyInternal(result);
+  }
+
+  /**
+   * Adds a new item to the accumulator state.  If the accumulator state is 
equal to the threshold value,
+   * the internal state is updated and the accumulator state is cleared.
+   * @param item the item to add to the accumulator state, cannot be null.
+   */
+  public void apply(T item) {
+    Preconditions.checkNotNull(item);
+    applyInternal(item);
+  }
+
+  private void applyInternal(T item) {
+    if (_accumulator == null) {
+      _accumulator = new ArrayList<>(_threshold);
+    }
+    _accumulator.add(item);
+    _numInputs += 1;
+
+    if (_accumulator.size() >= _threshold) {
+      getResult();
+      _accumulator.clear();
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
index c9554ce9bf..5e2219f12d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.segment.local.customobject;
 
-import java.util.ArrayList;
 import java.util.Comparator;
 import javax.annotation.Nonnull;
 import org.apache.datasketches.theta.SetOperationBuilder;
@@ -29,19 +28,15 @@ import org.apache.datasketches.theta.Union;
 /**
  * Intermediate state used by {@code 
DistinctCountThetaSketchAggregationFunction} which gives
  * the end user more control over how sketches are merged for performance.
- * The end user can set parameters that trade-off more memory usage for more 
pre-aggregation.
- * This permits use of the Union "early-stop" optimisation where ordered 
sketches require no further
+ * In particular, the Theta Sketch Union "early-stop" optimisation can be used 
- ordered sketches require no further
  * processing beyond the minimum Theta value.
  * The union operation initialises an empty "gadget" bookkeeping sketch that 
is updated with hashed entries
- * that fall below the minimum Theta value for all input sketches ("Broder 
Rule").  When the initial
- * Theta value is set to the minimum immediately, further gains can be 
realised.
+ * that fall below the minimum Theta value for all input sketches ("Broder 
Rule").  When the initial Theta value is
+ * set to the minimum immediately, further gains can be realised.
  */
-public class ThetaSketchAccumulator {
-  private ArrayList<Sketch> _accumulator;
+public class ThetaSketchAccumulator extends CustomObjectAccumulator<Sketch> {
   private SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
   private Union _union;
-  private int _threshold;
-  private int _numInputs = 0;
 
   public ThetaSketchAccumulator() {
   }
@@ -51,54 +46,20 @@ public class ThetaSketchAccumulator {
   // require re-initialisation. Since the primary use case is at query time 
for the Broker
   // and Server, these properties are already in memory and are re-set.
   public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder, int 
threshold) {
+    super(threshold);
     _setOperationBuilder = setOperationBuilder;
-    _threshold = threshold;
   }
 
   public void setSetOperationBuilder(SetOperationBuilder setOperationBuilder) {
     _setOperationBuilder = setOperationBuilder;
   }
 
-  public void setThreshold(int threshold) {
-    _threshold = threshold;
-  }
-
-  public boolean isEmpty() {
-    return _numInputs == 0;
-  }
-
   @Nonnull
+  @Override
   public Sketch getResult() {
     return unionAll();
   }
 
-  public void apply(Sketch sketch) {
-    internalAdd(sketch);
-  }
-
-  public void merge(ThetaSketchAccumulator thetaUnion) {
-    if (thetaUnion.isEmpty()) {
-      return;
-    }
-    Sketch sketch = thetaUnion.getResult();
-    internalAdd(sketch);
-  }
-
-  private void internalAdd(Sketch sketch) {
-    if (sketch.isEmpty()) {
-      return;
-    }
-    if (_accumulator == null) {
-      _accumulator = new ArrayList<>(_threshold);
-    }
-    _accumulator.add(sketch);
-    _numInputs += 1;
-
-    if (_accumulator.size() >= _threshold) {
-      unionAll();
-    }
-  }
-
   private Sketch unionAll() {
     if (_union == null) {
       _union = _setOperationBuilder.buildUnion();
@@ -111,7 +72,7 @@ public class ThetaSketchAccumulator {
     // This single sketch might have been the result of a previously 
accumulated union and
     // would already have the parameters set.  The sketch is returned as-is 
without adjusting
     // nominal entries which requires an additional union operation.
-    if (_numInputs == 1) {
+    if (getNumInputs() == 1) {
       return _accumulator.get(0);
     }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulator.java
similarity index 58%
copy from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
copy to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulator.java
index c9554ce9bf..5a24324913 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulator.java
@@ -18,100 +18,69 @@
  */
 package org.apache.pinot.segment.local.customobject;
 
-import java.util.ArrayList;
 import java.util.Comparator;
 import javax.annotation.Nonnull;
-import org.apache.datasketches.theta.SetOperationBuilder;
-import org.apache.datasketches.theta.Sketch;
-import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
 
 
 /**
- * Intermediate state used by {@code 
DistinctCountThetaSketchAggregationFunction} which gives
+ * Intermediate state used by {@code IntegerTupleSketchAggregationFunction} 
which gives
  * the end user more control over how sketches are merged for performance.
- * The end user can set parameters that trade-off more memory usage for more 
pre-aggregation.
- * This permits use of the Union "early-stop" optimisation where ordered 
sketches require no further
- * processing beyond the minimum Theta value.
+ * In particular, the Theta Sketch Union "early-stop" optimisation can be used 
- ordered sketches require no further
+ * processing beyond the minimum Theta value.  This applies to Tuple sketches 
because they are an extension of the
+ * Theta sketch.
  * The union operation initialises an empty "gadget" bookkeeping sketch that 
is updated with hashed entries
- * that fall below the minimum Theta value for all input sketches ("Broder 
Rule").  When the initial
- * Theta value is set to the minimum immediately, further gains can be 
realised.
+ * that fall below the minimum Theta value for all input sketches ("Broder 
Rule").  When the initial Theta value is
+ * set to the minimum immediately, further gains can be realised.
  */
-public class ThetaSketchAccumulator {
-  private ArrayList<Sketch> _accumulator;
-  private SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
-  private Union _union;
-  private int _threshold;
-  private int _numInputs = 0;
+public class TupleIntSketchAccumulator extends 
CustomObjectAccumulator<Sketch<IntegerSummary>> {
+  private IntegerSummarySetOperations _setOperations;
+  private int _nominalEntries;
+  private Union<IntegerSummary> _union;
 
-  public ThetaSketchAccumulator() {
+  public TupleIntSketchAccumulator() {
   }
 
   // Note: The accumulator is serialized as a sketch.  This means that the 
majority of the processing
   // happens on serialization. Therefore, when deserialized, the values may be 
null and will
   // require re-initialisation. Since the primary use case is at query time 
for the Broker
   // and Server, these properties are already in memory and are re-set.
-  public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder, int 
threshold) {
-    _setOperationBuilder = setOperationBuilder;
-    _threshold = threshold;
+  public TupleIntSketchAccumulator(IntegerSummarySetOperations setOperations, 
int nominalEntries, int threshold) {
+    super(threshold);
+    _nominalEntries = nominalEntries;
+    _setOperations = setOperations;
   }
 
-  public void setSetOperationBuilder(SetOperationBuilder setOperationBuilder) {
-    _setOperationBuilder = setOperationBuilder;
+  public void setSetOperations(IntegerSummarySetOperations setOperations) {
+    _setOperations = setOperations;
   }
 
-  public void setThreshold(int threshold) {
-    _threshold = threshold;
-  }
-
-  public boolean isEmpty() {
-    return _numInputs == 0;
+  public void setNominalEntries(int nominalEntries) {
+    _nominalEntries = nominalEntries;
   }
 
   @Nonnull
-  public Sketch getResult() {
+  @Override
+  public Sketch<IntegerSummary> getResult() {
     return unionAll();
   }
 
-  public void apply(Sketch sketch) {
-    internalAdd(sketch);
-  }
-
-  public void merge(ThetaSketchAccumulator thetaUnion) {
-    if (thetaUnion.isEmpty()) {
-      return;
-    }
-    Sketch sketch = thetaUnion.getResult();
-    internalAdd(sketch);
-  }
-
-  private void internalAdd(Sketch sketch) {
-    if (sketch.isEmpty()) {
-      return;
-    }
-    if (_accumulator == null) {
-      _accumulator = new ArrayList<>(_threshold);
-    }
-    _accumulator.add(sketch);
-    _numInputs += 1;
-
-    if (_accumulator.size() >= _threshold) {
-      unionAll();
-    }
-  }
-
-  private Sketch unionAll() {
+  private Sketch<IntegerSummary> unionAll() {
     if (_union == null) {
-      _union = _setOperationBuilder.buildUnion();
+      _union = new Union<>(_nominalEntries, _setOperations);
     }
     // Return the default update "gadget" sketch as a compact sketch
     if (isEmpty()) {
-      return _union.getResult(false, null);
+      return _union.getResult();
     }
     // Corner-case: the parameters are not strictly respected when there is a 
single sketch.
     // This single sketch might have been the result of a previously 
accumulated union and
     // would already have the parameters set.  The sketch is returned as-is 
without adjusting
     // nominal entries which requires an additional union operation.
-    if (_numInputs == 1) {
+    if (getNumInputs() == 1) {
       return _accumulator.get(0);
     }
 
@@ -125,11 +94,11 @@ public class ThetaSketchAccumulator {
     // which results in fewer redundant entries being retained and 
subsequently discarded during the
     // union operation.
     _accumulator.sort(Comparator.comparingDouble(Sketch::getTheta));
-    for (Sketch accumulatedSketch : _accumulator) {
+    for (Sketch<IntegerSummary> accumulatedSketch : _accumulator) {
       _union.union(accumulatedSketch);
     }
     _accumulator.clear();
 
-    return _union.getResult(false, null);
+    return _union.getResult();
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulatorTest.java
new file mode 100644
index 0000000000..a86144ed03
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulatorTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.stream.IntStream;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class CpcSketchAccumulatorTest {
+  private final int _lgNominalEntries = 20;
+  private final double _epsilon = 0.5;
+
+  @Test
+  public void testEmptyAccumulator() {
+    CpcSketchAccumulator accumulator = new 
CpcSketchAccumulator(_lgNominalEntries, 2);
+    Assert.assertTrue(accumulator.isEmpty());
+    Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
+  }
+
+  @Test
+  public void testAccumulatorWithSingleSketch() {
+    CpcSketch sketch = new CpcSketch(_lgNominalEntries);
+    IntStream.range(0, 1000).forEach(sketch::update);
+
+    CpcSketchAccumulator accumulator = new 
CpcSketchAccumulator(_lgNominalEntries, 2);
+    accumulator.apply(sketch);
+
+    Assert.assertFalse(accumulator.isEmpty());
+    Assert.assertEquals(accumulator.getResult().getEstimate(), 
sketch.getEstimate());
+  }
+
+  @Test
+  public void testAccumulatorMerge() {
+    CpcSketch sketch1 = new CpcSketch(_lgNominalEntries);
+    IntStream.range(0, 1000).forEach(sketch1::update);
+    CpcSketch sketch2 = new CpcSketch(_lgNominalEntries);
+    IntStream.range(1000, 2000).forEach(sketch2::update);
+
+    CpcSketchAccumulator accumulator1 = new 
CpcSketchAccumulator(_lgNominalEntries, 3);
+    accumulator1.apply(sketch1);
+    CpcSketchAccumulator accumulator2 = new 
CpcSketchAccumulator(_lgNominalEntries, 3);
+    accumulator2.apply(sketch2);
+    accumulator1.merge(accumulator2);
+
+    Assert.assertEquals(accumulator1.getResult().getEstimate(), 
sketch1.getEstimate() + sketch2.getEstimate(),
+        _epsilon);
+  }
+
+  @Test
+  public void testThresholdBehavior() {
+    CpcSketch sketch1 = new CpcSketch(_lgNominalEntries);
+    IntStream.range(0, 1000).forEach(sketch1::update);
+    CpcSketch sketch2 = new CpcSketch(_lgNominalEntries);
+    IntStream.range(1000, 2000).forEach(sketch2::update);
+
+    CpcSketchAccumulator accumulator = new 
CpcSketchAccumulator(_lgNominalEntries, 3);
+    accumulator.apply(sketch1);
+    accumulator.apply(sketch2);
+
+    Assert.assertEquals(accumulator.getResult().getEstimate(), 
sketch1.getEstimate() + sketch2.getEstimate(), _epsilon);
+  }
+
+  @Test
+  public void testUnionWithEmptyInput() {
+    CpcSketchAccumulator accumulator = new 
CpcSketchAccumulator(_lgNominalEntries, 3);
+    CpcSketchAccumulator emptyAccumulator = new 
CpcSketchAccumulator(_lgNominalEntries, 3);
+
+    accumulator.merge(emptyAccumulator);
+
+    Assert.assertTrue(accumulator.isEmpty());
+    Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulatorTest.java
new file mode 100644
index 0000000000..7755416211
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulatorTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.stream.IntStream;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TupleIntSketchAccumulatorTest {
+  private IntegerSummarySetOperations _setOps;
+  private final int _lgK = 12;
+  private final int _nominalEntries = (int) Math.pow(2, _lgK);
+
+  @BeforeMethod
+  public void setUp() {
+    _setOps = new IntegerSummarySetOperations(IntegerSummary.Mode.Sum, 
IntegerSummary.Mode.Sum);
+  }
+
+  @Test
+  public void testEmptyAccumulator() {
+    TupleIntSketchAccumulator accumulator = new 
TupleIntSketchAccumulator(_setOps, _nominalEntries, 2);
+    Assert.assertTrue(accumulator.isEmpty());
+    Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
+  }
+
+  @Test
+  public void testAccumulatorWithSingleSketch() {
+    IntegerSketch input = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum);
+    IntStream.range(0, 1000).forEach(i -> input.update(i, 1));
+    CompactSketch<IntegerSummary> sketch = input.compact();
+
+    TupleIntSketchAccumulator accumulator = new 
TupleIntSketchAccumulator(_setOps, _nominalEntries, 2);
+    accumulator.apply(sketch);
+
+    Assert.assertFalse(accumulator.isEmpty());
+    Assert.assertEquals(accumulator.getResult().getEstimate(), 
sketch.getEstimate());
+  }
+
+  @Test
+  public void testAccumulatorMerge() {
+    IntegerSketch input1 = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum);
+    IntStream.range(0, 1000).forEach(i -> input1.update(i, 1));
+    CompactSketch<IntegerSummary> sketch1 = input1.compact();
+    IntegerSketch input2 = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum);
+    IntStream.range(1000, 2000).forEach(i -> input2.update(i, 1));
+    CompactSketch<IntegerSummary> sketch2 = input2.compact();
+
+    TupleIntSketchAccumulator accumulator1 = new 
TupleIntSketchAccumulator(_setOps, _nominalEntries, 3);
+    accumulator1.apply(sketch1);
+    TupleIntSketchAccumulator accumulator2 = new 
TupleIntSketchAccumulator(_setOps, _nominalEntries, 3);
+    accumulator2.apply(sketch2);
+    accumulator1.merge(accumulator2);
+
+    Assert.assertEquals(accumulator1.getResult().getEstimate(), 
sketch1.getEstimate() + sketch2.getEstimate());
+  }
+
+  @Test
+  public void testThresholdBehavior() {
+    IntegerSketch input1 = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum);
+    IntStream.range(0, 1000).forEach(i -> input1.update(i, 1));
+    CompactSketch<IntegerSummary> sketch1 = input1.compact();
+    IntegerSketch input2 = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum);
+    IntStream.range(1000, 2000).forEach(i -> input2.update(i, 1));
+    CompactSketch<IntegerSummary> sketch2 = input2.compact();
+
+    TupleIntSketchAccumulator accumulator = new 
TupleIntSketchAccumulator(_setOps, _nominalEntries, 3);
+    accumulator.apply(sketch1);
+    accumulator.apply(sketch2);
+
+    Assert.assertEquals(accumulator.getResult().getEstimate(), 
sketch1.getEstimate() + sketch2.getEstimate());
+  }
+
+  @Test
+  public void testUnionWithEmptyInput() {
+    TupleIntSketchAccumulator accumulator = new 
TupleIntSketchAccumulator(_setOps, _nominalEntries, 3);
+    TupleIntSketchAccumulator emptyAccumulator = new 
TupleIntSketchAccumulator(_setOps, _nominalEntries, 3);
+
+    accumulator.merge(emptyAccumulator);
+
+    Assert.assertTrue(accumulator.isEmpty());
+    Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to