This is an automated email from the ASF dual-hosted git repository.

atri pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d7aed2e863 Funnel Count - Multiple Strategies (no partitioning 
requisites) (#11092)
d7aed2e863 is described below

commit d7aed2e8631ac1c0ee107d520f334f067117fbc1
Author: dario-liberman <[email protected]>
AuthorDate: Wed Aug 30 14:40:07 2023 +0200

    Funnel Count - Multiple Strategies (no partitioning requisites) (#11092)
    
    * FUNNEL_COUNT - aggregation strategies
    
    * FUNNEL_COUNT - Aggregation Strategies Tests
    
    * FUNNEL_COUNT - Aggregation Strategies Tests
    
    * Refactor: Move strategy greation into a factory, make funnel count 
aggregation function parametric
    
    * Add license headers
    
    * Simplify factory by postponing strategy construction and templetizing 
sorted split
    
    * Fix linter errors
    
    ---------
    
    Co-authored-by: Dario Liberman <[email protected]>
---
 .../function/AggregationFunctionFactory.java       |   3 +-
 .../function/FunnelCountAggregationFunction.java   | 511 ---------------------
 .../function/funnel/AggregationStrategy.java       | 167 +++++++
 .../function/funnel/BitmapAggregationStrategy.java |  44 ++
 .../function/funnel/BitmapMergeStrategy.java       |  52 +++
 .../funnel/BitmapResultExtractionStrategy.java     |  85 ++++
 .../function/funnel/DictIdsWrapper.java            |  36 ++
 .../funnel/FunnelCountAggregationFunction.java     | 188 ++++++++
 .../FunnelCountAggregationFunctionFactory.java     | 272 +++++++++++
 .../FunnelCountSortedAggregationFunction.java      | 129 ++++++
 .../aggregation/function/funnel/MergeStrategy.java |  37 ++
 .../function/funnel/PartitionedMergeStrategy.java  |  50 ++
 .../function/funnel/ResultExtractionStrategy.java  |  52 +++
 .../function/funnel/SetMergeStrategy.java          |  52 +++
 .../funnel/SetResultExtractionStrategy.java        |  94 ++++
 .../function/funnel/SortedAggregationResult.java   |  67 +++
 .../function/funnel/SortedAggregationStrategy.java |  44 ++
 .../funnel/ThetaSketchAggregationStrategy.java     |  73 +++
 .../function/funnel/ThetaSketchMergeStrategy.java  |  62 +++
 .../ThetaSketchResultExtractionStrategy.java       |  45 ++
 .../pinot/queries/BaseFunnelCountQueriesTest.java  |  41 +-
 ...Test.java => FunnelCountQueriesBitmapTest.java} |  34 +-
 ...> FunnelCountQueriesPartitionedSortedTest.java} |  22 +-
 ...java => FunnelCountQueriesPartitionedTest.java} |  28 +-
 ...tedTest.java => FunnelCountQueriesSetTest.java} |  34 +-
 ...java => FunnelCountQueriesThetaSketchTest.java} |  34 +-
 26 files changed, 1708 insertions(+), 548 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 0f03ee8723..504a09391c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.datasketches.tuple.aninteger.IntegerSummary;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FunctionContext;
+import 
org.apache.pinot.core.query.aggregation.function.funnel.FunnelCountAggregationFunctionFactory;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
@@ -352,7 +353,7 @@ public class AggregationFunctionFactory {
             throw new IllegalArgumentException(
                 "Aggregation function: " + function + " is only supported in 
selection without alias.");
           case FUNNELCOUNT:
-            return new FunnelCountAggregationFunction(arguments);
+            return new FunnelCountAggregationFunctionFactory(arguments).get();
 
           default:
             throw new IllegalArgumentException();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java
deleted file mode 100644
index 4eecad1002..0000000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java
+++ /dev/null
@@ -1,511 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.aggregation.function;
-
-import com.google.common.base.Preconditions;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
-import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
-import org.apache.pinot.segment.spi.index.reader.Dictionary;
-import org.roaringbitmap.RoaringBitmap;
-
-
-/**
- * The {@code FunnelCountAggregationFunction} calculates the number of step 
conversions for a given partition column and
- * a list of boolean expressions.
- * <p>IMPORTANT: This function relies on the partition column being 
partitioned for each segment, where there are no
- * common values across different segments.
- * <p>This function calculates the exact number of step matches per partition 
key within the segment, then sums up the
- * results from different segments.
- *
- * Example:
- *   SELECT
- *    dateTrunc('day', timestamp) AS ts,
- *    FUNNEL_COUNT(
- *      STEPS(url = '/addToCart', url = '/checkout', url = 
'/orderConfirmation'),
- *      CORRELATED_BY(user)
- *    ) as step_counts
- *    FROM user_log
- *    WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
- *    GROUP BY 1
- */
-public class FunnelCountAggregationFunction implements 
AggregationFunction<List<Long>, LongArrayList> {
-  final List<ExpressionContext> _expressions;
-  final List<ExpressionContext> _stepExpressions;
-  final List<ExpressionContext> _correlateByExpressions;
-  final ExpressionContext _primaryCorrelationCol;
-  final int _numSteps;
-
-  final SegmentAggregationStrategy<?, List<Long>> _sortedAggregationStrategy;
-  final SegmentAggregationStrategy<?, List<Long>> _bitmapAggregationStrategy;
-
-  public FunnelCountAggregationFunction(List<ExpressionContext> expressions) {
-    _expressions = expressions;
-    _correlateByExpressions = 
Option.CORRELATE_BY.getInputExpressions(expressions);
-    _primaryCorrelationCol = 
Option.CORRELATE_BY.getFirstInputExpression(expressions);
-    _stepExpressions = Option.STEPS.getInputExpressions(expressions);
-    _numSteps = _stepExpressions.size();
-    _sortedAggregationStrategy = new SortedAggregationStrategy();
-    _bitmapAggregationStrategy = new BitmapAggregationStrategy();
-  }
-
-  @Override
-  public String getResultColumnName() {
-    return getType().getName().toLowerCase() + "(" + 
_expressions.stream().map(ExpressionContext::toString)
-        .collect(Collectors.joining(",")) + ")";
-  }
-
-  @Override
-  public List<ExpressionContext> getInputExpressions() {
-    final List<ExpressionContext> inputs = new ArrayList<>();
-    inputs.addAll(_correlateByExpressions);
-    inputs.addAll(_stepExpressions);
-    return inputs;
-  }
-
-  @Override
-  public AggregationFunctionType getType() {
-    return AggregationFunctionType.FUNNELCOUNT;
-  }
-
-  @Override
-  public AggregationResultHolder createAggregationResultHolder() {
-    return new ObjectAggregationResultHolder();
-  }
-
-  @Override
-  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
-    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
-  }
-
-  @Override
-  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregate(length, 
aggregationResultHolder, blockValSetMap);
-  }
-
-  @Override
-  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    
getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregateGroupBySV(length,
 groupKeyArray,
-        groupByResultHolder, blockValSetMap);
-  }
-
-  @Override
-  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    
getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregateGroupByMV(length,
 groupKeysArray,
-        groupByResultHolder, blockValSetMap);
-  }
-
-  @Override
-  public List<Long> extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
-    return 
getAggregationStrategyByAggregationResult(aggregationResultHolder.getResult()).extractAggregationResult(
-        aggregationResultHolder);
-  }
-
-  @Override
-  public List<Long> extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
-    return 
getAggregationStrategyByAggregationResult(groupByResultHolder.getResult(groupKey)).extractGroupByResult(
-        groupByResultHolder, groupKey);
-  }
-
-  @Override
-  public List<Long> merge(List<Long> a, List<Long> b) {
-    int length = a.size();
-    Preconditions.checkState(length == b.size(), "The two operand arrays are 
not of the same size! provided %s, %s",
-        length, b.size());
-
-    LongArrayList result = toLongArrayList(a);
-    long[] elements = result.elements();
-    for (int i = 0; i < length; i++) {
-      elements[i] += b.get(i);
-    }
-    return result;
-  }
-
-  @Override
-  public ColumnDataType getIntermediateResultColumnType() {
-    return ColumnDataType.OBJECT;
-  }
-
-  @Override
-  public ColumnDataType getFinalResultColumnType() {
-    return ColumnDataType.LONG_ARRAY;
-  }
-
-  @Override
-  public LongArrayList extractFinalResult(List<Long> result) {
-    return toLongArrayList(result);
-  }
-
-  @Override
-  public String toExplainString() {
-    StringBuilder stringBuilder = new 
StringBuilder(getType().getName()).append('(');
-    int numArguments = getInputExpressions().size();
-    if (numArguments > 0) {
-      stringBuilder.append(getInputExpressions().get(0).toString());
-      for (int i = 1; i < numArguments; i++) {
-        stringBuilder.append(", 
").append(getInputExpressions().get(i).toString());
-      }
-    }
-    return stringBuilder.append(')').toString();
-  }
-
-  private static LongArrayList toLongArrayList(List<Long> longList) {
-    return longList instanceof LongArrayList ? ((LongArrayList) 
longList).clone() : new LongArrayList(longList);
-  }
-
-  private int[] getCorrelationIds(Map<ExpressionContext, BlockValSet> 
blockValSetMap) {
-    return blockValSetMap.get(_primaryCorrelationCol).getDictionaryIdsSV();
-  }
-
-  private int[][] getSteps(Map<ExpressionContext, BlockValSet> blockValSetMap) 
{
-    final int[][] steps = new int[_numSteps][];
-    for (int n = 0; n < _numSteps; n++) {
-      final BlockValSet stepBlockValSet = 
blockValSetMap.get(_stepExpressions.get(n));
-      steps[n] = stepBlockValSet.getIntValuesSV();
-    }
-    return steps;
-  }
-
-  private boolean isSorted(Map<ExpressionContext, BlockValSet> blockValSetMap) 
{
-    final Dictionary primaryCorrelationDictionary = 
blockValSetMap.get(_primaryCorrelationCol).getDictionary();
-    if (primaryCorrelationDictionary == null) {
-      throw new IllegalArgumentException(
-          "CORRELATE_BY column in FUNNELCOUNT aggregation function not 
supported, please use a dictionary encoded "
-              + "column.");
-    }
-    return primaryCorrelationDictionary.isSorted();
-  }
-
-  private SegmentAggregationStrategy<?, List<Long>> 
getAggregationStrategyByBlockValSetMap(
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    return isSorted(blockValSetMap) ? _sortedAggregationStrategy : 
_bitmapAggregationStrategy;
-  }
-
-  private SegmentAggregationStrategy<?, List<Long>> 
getAggregationStrategyByAggregationResult(Object aggResult) {
-    return aggResult instanceof SortedAggregationResult ? 
_sortedAggregationStrategy : _bitmapAggregationStrategy;
-  }
-
-  enum Option {
-    STEPS("steps"),
-    CORRELATE_BY("correlateby");
-
-    final String _name;
-
-    Option(String name) {
-      _name = name;
-    }
-
-    boolean matches(ExpressionContext expression) {
-      if (expression.getType() != ExpressionContext.Type.FUNCTION) {
-        return false;
-      }
-      return _name.equals(expression.getFunction().getFunctionName());
-    }
-
-    Optional<ExpressionContext> find(List<ExpressionContext> expressions) {
-      return expressions.stream().filter(this::matches).findFirst();
-    }
-
-    public List<ExpressionContext> getInputExpressions(List<ExpressionContext> 
expressions) {
-      return this.find(expressions).map(exp -> 
exp.getFunction().getArguments())
-          .orElseThrow(() -> new IllegalStateException("FUNNELCOUNT requires " 
+ _name));
-    }
-
-    public ExpressionContext getFirstInputExpression(List<ExpressionContext> 
expressions) {
-      return this.find(expressions)
-          .flatMap(exp -> 
exp.getFunction().getArguments().stream().findFirst())
-          .orElseThrow(() -> new IllegalStateException("FUNNELCOUNT: " + _name 
+ " requires an argument."));
-    }
-  }
-
-  /**
-   * Interface for segment aggregation strategy.
-   *
-   * <p>The implementation should be stateless, and can be shared among 
multiple segments in multiple threads. The
-   * result for each segment should be stored and passed in via the result 
holder.
-   * There should be no assumptions beyond segment boundaries, different 
aggregation strategies may be utilized
-   * across different segments for a given query.
-   *
-   * @param <A> Aggregation result accumulated across blocks within segment, 
kept by result holder.
-   * @param <I> Intermediate result at segment level (extracted from 
aforementioned aggregation result).
-   */
-  @ThreadSafe
-  static abstract class SegmentAggregationStrategy<A, I> {
-
-    /**
-     * Returns an aggregation result for this aggregation strategy to be kept 
in a result holder (aggregation only).
-     */
-    abstract A createAggregationResult();
-
-    public A getAggregationResultGroupBy(GroupByResultHolder 
groupByResultHolder, int groupKey) {
-      A aggResult = groupByResultHolder.getResult(groupKey);
-      if (aggResult == null) {
-        aggResult = createAggregationResult();
-        groupByResultHolder.setValueForKey(groupKey, aggResult);
-      }
-      return aggResult;
-    }
-
-    public A getAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
-      A aggResult = aggregationResultHolder.getResult();
-      if (aggResult == null) {
-        aggResult = createAggregationResult();
-        aggregationResultHolder.setValue(aggResult);
-      }
-      return aggResult;
-    }
-
-    /**
-     * Performs aggregation on the given block value sets (aggregation only).
-     */
-    abstract void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
-        Map<ExpressionContext, BlockValSet> blockValSetMap);
-
-    /**
-     * Performs aggregation on the given group key array and block value sets 
(aggregation group-by on single-value
-     * columns).
-     */
-    abstract void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
-        Map<ExpressionContext, BlockValSet> blockValSetMap);
-
-    /**
-     * Performs aggregation on the given group keys array and block value sets 
(aggregation group-by on multi-value
-     * columns).
-     */
-    abstract void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
-        Map<ExpressionContext, BlockValSet> blockValSetMap);
-
-    /**
-     * Extracts the intermediate result from the aggregation result holder 
(aggregation only).
-     */
-    public I extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
-      return extractIntermediateResult(aggregationResultHolder.getResult());
-    }
-
-    /**
-     * Extracts the intermediate result from the group-by result holder for 
the given group key (aggregation group-by).
-     */
-    public I extractGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey) {
-      return 
extractIntermediateResult(groupByResultHolder.getResult(groupKey));
-    }
-
-    abstract I extractIntermediateResult(A aggregationResult);
-  }
-
-  /**
-   * Aggregation strategy leveraging roaring bitmap algebra 
(unions/intersections).
-   */
-  class BitmapAggregationStrategy extends 
SegmentAggregationStrategy<RoaringBitmap[], List<Long>> {
-
-    @Override
-    public RoaringBitmap[] createAggregationResult() {
-      final RoaringBitmap[] stepsBitmaps = new RoaringBitmap[_numSteps];
-      for (int n = 0; n < _numSteps; n++) {
-        stepsBitmaps[n] = new RoaringBitmap();
-      }
-      return stepsBitmaps;
-    }
-
-    @Override
-    public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
-        Map<ExpressionContext, BlockValSet> blockValSetMap) {
-      final int[] correlationIds = getCorrelationIds(blockValSetMap);
-      final int[][] steps = getSteps(blockValSetMap);
-
-      final RoaringBitmap[] stepsBitmaps = 
getAggregationResult(aggregationResultHolder);
-
-      for (int n = 0; n < _numSteps; n++) {
-        for (int i = 0; i < length; i++) {
-          if (steps[n][i] > 0) {
-            stepsBitmaps[n].add(correlationIds[i]);
-          }
-        }
-      }
-    }
-
-    @Override
-    public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
-        Map<ExpressionContext, BlockValSet> blockValSetMap) {
-      final int[] correlationIds = getCorrelationIds(blockValSetMap);
-      final int[][] steps = getSteps(blockValSetMap);
-
-      for (int n = 0; n < _numSteps; n++) {
-        for (int i = 0; i < length; i++) {
-          final int groupKey = groupKeyArray[i];
-          if (steps[n][i] > 0) {
-            getAggregationResultGroupBy(groupByResultHolder, 
groupKey)[n].add(correlationIds[i]);
-          }
-        }
-      }
-    }
-
-    @Override
-    public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
-        Map<ExpressionContext, BlockValSet> blockValSetMap) {
-      final int[] correlationIds = getCorrelationIds(blockValSetMap);
-      final int[][] steps = getSteps(blockValSetMap);
-
-      for (int n = 0; n < _numSteps; n++) {
-        for (int i = 0; i < length; i++) {
-          for (int groupKey : groupKeysArray[i]) {
-            if (steps[n][i] > 0) {
-              getAggregationResultGroupBy(groupByResultHolder, 
groupKey)[n].add(correlationIds[i]);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    public List<Long> extractIntermediateResult(RoaringBitmap[] stepsBitmaps) {
-      if (stepsBitmaps == null) {
-        return new LongArrayList(_numSteps);
-      }
-
-      long[] result = new long[_numSteps];
-      result[0] = stepsBitmaps[0].getCardinality();
-      for (int i = 1; i < _numSteps; i++) {
-        // intersect this step with previous step
-        stepsBitmaps[i].and(stepsBitmaps[i - 1]);
-        result[i] = stepsBitmaps[i].getCardinality();
-      }
-      return LongArrayList.wrap(result);
-    }
-  }
-
-  /**
-   * Aggregation strategy for segments sorted by the main correlation column.
-   */
-  class SortedAggregationStrategy extends 
SegmentAggregationStrategy<SortedAggregationResult, List<Long>> {
-
-    @Override
-    public SortedAggregationResult createAggregationResult() {
-      return new SortedAggregationResult();
-    }
-
-    @Override
-    public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
-        Map<ExpressionContext, BlockValSet> blockValSetMap) {
-      final int[] correlationIds = getCorrelationIds(blockValSetMap);
-      final int[][] steps = getSteps(blockValSetMap);
-
-      final SortedAggregationResult agg = 
getAggregationResult(aggregationResultHolder);
-
-      for (int i = 0; i < length; i++) {
-        agg.sortedCount(steps, i, correlationIds[i]);
-      }
-    }
-
-    @Override
-    public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
-        Map<ExpressionContext, BlockValSet> blockValSetMap) {
-      final int[] correlationIds = getCorrelationIds(blockValSetMap);
-      final int[][] steps = getSteps(blockValSetMap);
-
-      for (int i = 0; i < length; i++) {
-        final int groupKey = groupKeyArray[i];
-        final SortedAggregationResult agg = 
getAggregationResultGroupBy(groupByResultHolder, groupKey);
-
-        agg.sortedCount(steps, i, correlationIds[i]);
-      }
-    }
-
-    @Override
-    public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
-        Map<ExpressionContext, BlockValSet> blockValSetMap) {
-      final int[] correlationIds = getCorrelationIds(blockValSetMap);
-      final int[][] steps = getSteps(blockValSetMap);
-
-      for (int i = 0; i < length; i++) {
-        for (int groupKey : groupKeysArray[i]) {
-          final SortedAggregationResult agg = 
getAggregationResultGroupBy(groupByResultHolder, groupKey);
-
-          agg.sortedCount(steps, i, correlationIds[i]);
-        }
-      }
-    }
-
-    @Override
-    public List<Long> extractIntermediateResult(SortedAggregationResult agg) {
-      if (agg == null) {
-        return new LongArrayList(_numSteps);
-      }
-
-      return LongArrayList.wrap(agg.extractResult());
-    }
-  }
-
-  /**
-   * Aggregation result data structure leveraged by sorted aggregation 
strategy.
-   */
-  class SortedAggregationResult {
-    public long[] _stepCounters = new long[_numSteps];
-    public int _lastCorrelationId = Integer.MIN_VALUE;
-    public boolean[] _correlatedSteps = new boolean[_numSteps];
-
-    public void sortedCount(int[][] steps, int i, int correlationId) {
-      if (correlationId == _lastCorrelationId) {
-        // same correlation as before, keep accumulating.
-        for (int n = 0; n < _numSteps; n++) {
-          _correlatedSteps[n] |= steps[n][i] > 0;
-        }
-      } else {
-        // End of correlation group, calculate funnel conversion counts
-        incrStepCounters();
-
-        // initialize next correlation group
-        for (int n = 0; n < _numSteps; n++) {
-          _correlatedSteps[n] = steps[n][i] > 0;
-        }
-        _lastCorrelationId = correlationId;
-      }
-    }
-
-    void incrStepCounters() {
-      for (int n = 0; n < _numSteps; n++) {
-        if (!_correlatedSteps[n]) {
-          break;
-        }
-        _stepCounters[n]++;
-      }
-    }
-
-    public long[] extractResult() {
-      // count last correlation id left open
-      incrStepCounters();
-
-      return _stepCounters;
-    }
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/AggregationStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/AggregationStrategy.java
new file mode 100644
index 0000000000..298fd4a805
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/AggregationStrategy.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+
+
+/**
+ * Interface for within segment aggregation strategy.
+ *
+ * <p>The implementation should be stateless, and can be shared among multiple 
segments in multiple threads. The
+ * result for each segment should be stored and passed in via the result 
holder.
+ * There should be no assumptions beyond segment boundaries, different 
aggregation strategies may be utilized
+ * across different segments for a given query.
+ *
+ * @param <A> Aggregation result accumulated across blocks within segment, 
kept by result holder.
+ */
+@ThreadSafe
+public abstract class AggregationStrategy<A> {
+
+  protected final int _numSteps;
+  private final List<ExpressionContext> _stepExpressions;
+  private final List<ExpressionContext> _correlateByExpressions;
+  private final ExpressionContext _primaryCorrelationCol;
+
+  public AggregationStrategy(List<ExpressionContext> stepExpressions, 
List<ExpressionContext> correlateByExpressions) {
+    _stepExpressions = stepExpressions;
+    _correlateByExpressions = correlateByExpressions;
+    _primaryCorrelationCol = _correlateByExpressions.get(0);
+    _numSteps = _stepExpressions.size();
+  }
+
+  /**
+   * Returns an aggregation result for this aggregation strategy to be kept in 
a result holder (aggregation only).
+   */
+  abstract A createAggregationResult(Dictionary dictionary);
+
+  public A getAggregationResultGroupBy(Dictionary dictionary, 
GroupByResultHolder groupByResultHolder, int groupKey) {
+    A aggResult = groupByResultHolder.getResult(groupKey);
+    if (aggResult == null) {
+      aggResult = createAggregationResult(dictionary);
+      groupByResultHolder.setValueForKey(groupKey, aggResult);
+    }
+    return aggResult;
+  }
+
+  public A getAggregationResult(Dictionary dictionary, AggregationResultHolder 
aggregationResultHolder) {
+    A aggResult = aggregationResultHolder.getResult();
+    if (aggResult == null) {
+      aggResult = createAggregationResult(dictionary);
+      aggregationResultHolder.setValue(aggResult);
+    }
+    return aggResult;
+  }
+
+  /**
+   * Performs aggregation on the given block value sets (aggregation only).
+   */
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    final Dictionary dictionary = getDictionary(blockValSetMap);
+    final int[] correlationIds = getCorrelationIds(blockValSetMap);
+    final int[][] steps = getSteps(blockValSetMap);
+
+    final A aggResult = getAggregationResult(dictionary, 
aggregationResultHolder);
+    for (int i = 0; i < length; i++) {
+      for (int n = 0; n < _numSteps; n++) {
+        if (steps[n][i] > 0) {
+          add(dictionary, aggResult, n, correlationIds[i]);
+        }
+      }
+    }
+  }
+
+  /**
+   * Performs aggregation on the given group key array and block value sets 
(aggregation group-by on single-value
+   * columns).
+   */
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    final Dictionary dictionary = getDictionary(blockValSetMap);
+    final int[] correlationIds = getCorrelationIds(blockValSetMap);
+    final int[][] steps = getSteps(blockValSetMap);
+
+    for (int i = 0; i < length; i++) {
+      for (int n = 0; n < _numSteps; n++) {
+        final int groupKey = groupKeyArray[i];
+        final A aggResult = getAggregationResultGroupBy(dictionary, 
groupByResultHolder, groupKey);
+        if (steps[n][i] > 0) {
+          add(dictionary, aggResult, n, correlationIds[i]);
+        }
+      }
+    }
+  }
+
+  /**
+   * Performs aggregation on the given group keys array and block value sets 
(aggregation group-by on multi-value
+   * columns).
+   */
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    final Dictionary dictionary = getDictionary(blockValSetMap);
+    final int[] correlationIds = getCorrelationIds(blockValSetMap);
+    final int[][] steps = getSteps(blockValSetMap);
+
+    for (int i = 0; i < length; i++) {
+      for (int n = 0; n < _numSteps; n++) {
+        for (int groupKey : groupKeysArray[i]) {
+          final A aggResult = getAggregationResultGroupBy(dictionary, 
groupByResultHolder, groupKey);
+          if (steps[n][i] > 0) {
+            add(dictionary, aggResult, n, correlationIds[i]);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Adds a correlation id to the aggregation counter for a given step in the 
funnel.
+   */
+  abstract void add(Dictionary dictionary, A aggResult, int step, int 
correlationId);
+
+  private Dictionary getDictionary(Map<ExpressionContext, BlockValSet> 
blockValSetMap) {
+    final Dictionary primaryCorrelationDictionary = 
blockValSetMap.get(_primaryCorrelationCol).getDictionary();
+    Preconditions.checkArgument(primaryCorrelationDictionary != null,
+        "CORRELATE_BY column in FUNNELCOUNT aggregation function not 
supported, please use a dictionary encoded "
+            + "column.");
+    return primaryCorrelationDictionary;
+  }
+
+  private int[] getCorrelationIds(Map<ExpressionContext, BlockValSet> 
blockValSetMap) {
+    return blockValSetMap.get(_primaryCorrelationCol).getDictionaryIdsSV();
+  }
+
+  private int[][] getSteps(Map<ExpressionContext, BlockValSet> blockValSetMap) 
{
+    final int[][] steps = new int[_numSteps][];
+    for (int n = 0; n < _numSteps; n++) {
+      final BlockValSet stepBlockValSet = 
blockValSetMap.get(_stepExpressions.get(n));
+      steps[n] = stepBlockValSet.getIntValuesSV();
+    }
+    return steps;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapAggregationStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapAggregationStrategy.java
new file mode 100644
index 0000000000..f726d93620
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapAggregationStrategy.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+
+
+/**
+ * Aggregation strategy leveraging roaring bitmap algebra 
(unions/intersections).
+ */
+class BitmapAggregationStrategy extends AggregationStrategy<DictIdsWrapper> {
+  public BitmapAggregationStrategy(List<ExpressionContext> stepExpressions,
+      List<ExpressionContext> correlateByExpressions) {
+    super(stepExpressions, correlateByExpressions);
+  }
+
+  @Override
+  public DictIdsWrapper createAggregationResult(Dictionary dictionary) {
+    return new DictIdsWrapper(_numSteps, dictionary);
+  }
+
+  @Override
+  protected void add(Dictionary dictionary, DictIdsWrapper dictIdsWrapper, int 
step, int correlationId) {
+    dictIdsWrapper._stepsBitmaps[step].add(correlationId);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapMergeStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapMergeStrategy.java
new file mode 100644
index 0000000000..0be8bbadbe
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapMergeStrategy.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.util.List;
+import org.roaringbitmap.RoaringBitmap;
+
+
+class BitmapMergeStrategy implements MergeStrategy<List<RoaringBitmap>> {
+  protected final int _numSteps;
+
+  BitmapMergeStrategy(int numSteps) {
+    _numSteps = numSteps;
+  }
+
+  @Override
+  public List<RoaringBitmap> merge(List<RoaringBitmap> intermediateResult1, 
List<RoaringBitmap> intermediateResult2) {
+    for (int i = 0; i < _numSteps; i++) {
+      intermediateResult1.get(i).or(intermediateResult2.get(i));
+    }
+    return intermediateResult1;
+  }
+
+  @Override
+  public LongArrayList extractFinalResult(List<RoaringBitmap> stepsBitmaps) {
+    long[] result = new long[_numSteps];
+    result[0] = stepsBitmaps.get(0).getCardinality();
+    for (int i = 1; i < _numSteps; i++) {
+      // intersect this step with previous step
+      stepsBitmaps.get(i).and(stepsBitmaps.get(i - 1));
+      result[i] = stepsBitmaps.get(i).getCardinality();
+    }
+    return LongArrayList.wrap(result);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapResultExtractionStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapResultExtractionStrategy.java
new file mode 100644
index 0000000000..b60bad7e0c
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapResultExtractionStrategy.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+class BitmapResultExtractionStrategy implements 
ResultExtractionStrategy<DictIdsWrapper, List<RoaringBitmap>> {
+  protected final int _numSteps;
+
+  BitmapResultExtractionStrategy(int numSteps) {
+    _numSteps = numSteps;
+  }
+
+  @Override
+  public List<RoaringBitmap> extractIntermediateResult(DictIdsWrapper 
dictIdsWrapper) {
+    Dictionary dictionary = dictIdsWrapper._dictionary;
+    List<RoaringBitmap> result = new ArrayList<>(_numSteps);
+    for (RoaringBitmap dictIdBitmap : dictIdsWrapper._stepsBitmaps) {
+      result.add(convertToValueBitmap(dictionary, dictIdBitmap));
+    }
+    return result;
+  }
+
+  /**
+   * Helper method to read dictionary and convert dictionary ids to hash code 
of the values for dictionary-encoded
+   * expression.
+   */
+  private RoaringBitmap convertToValueBitmap(Dictionary dictionary, 
RoaringBitmap dictIdBitmap) {
+    RoaringBitmap valueBitmap = new RoaringBitmap();
+    PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
+    FieldSpec.DataType storedType = dictionary.getValueType();
+    switch (storedType) {
+      case INT:
+        while (iterator.hasNext()) {
+          valueBitmap.add(dictionary.getIntValue(iterator.next()));
+        }
+        break;
+      case LONG:
+        while (iterator.hasNext()) {
+          
valueBitmap.add(Long.hashCode(dictionary.getLongValue(iterator.next())));
+        }
+        break;
+      case FLOAT:
+        while (iterator.hasNext()) {
+          
valueBitmap.add(Float.hashCode(dictionary.getFloatValue(iterator.next())));
+        }
+        break;
+      case DOUBLE:
+        while (iterator.hasNext()) {
+          
valueBitmap.add(Double.hashCode(dictionary.getDoubleValue(iterator.next())));
+        }
+        break;
+      case STRING:
+        while (iterator.hasNext()) {
+          
valueBitmap.add(dictionary.getStringValue(iterator.next()).hashCode());
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("Illegal data type for FUNNEL_COUNT 
aggregation function: " + storedType);
+    }
+    return valueBitmap;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/DictIdsWrapper.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/DictIdsWrapper.java
new file mode 100644
index 0000000000..c09d0128f2
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/DictIdsWrapper.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.roaringbitmap.RoaringBitmap;
+
+
+final class DictIdsWrapper {
+  final Dictionary _dictionary;
+  final RoaringBitmap[] _stepsBitmaps;
+
+  DictIdsWrapper(int numSteps, Dictionary dictionary) {
+    _dictionary = dictionary;
+    _stepsBitmaps = new RoaringBitmap[numSteps];
+    for (int n = 0; n < numSteps; n++) {
+      _stepsBitmaps[n] = new RoaringBitmap();
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountAggregationFunction.java
new file mode 100644
index 0000000000..3c258277db
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountAggregationFunction.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * The {@code FunnelCountAggregationFunction} calculates the number of 
conversions for a given correlation column and
+ * a list of steps as boolean expressions.
+ *
+ * @param <A> Aggregation result accumulated across blocks within segment, 
kept by result holder.
+ * @param <I> Intermediate result at segment level (extracted from 
aforementioned aggregation result).
+ *
+ * Example:
+ *   SELECT
+ *    dateTrunc('day', timestamp) AS ts,
+ *    FUNNEL_COUNT(
+ *      STEPS(url = '/addToCart', url = '/checkout', url = 
'/orderConfirmation'),
+ *      CORRELATE_BY(user_id)
+ *    ) as step_counts
+ *    FROM user_log
+ *    WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
+ *    GROUP BY 1
+ *
+ *  Counting strategies can be controlled via optional SETTINGS options, for 
example:
+ *
+ *  FUNNEL_COUNT(
+ *    STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
+ *    CORRELATE_BY(user_id),
+ *    SETTINGS('theta_sketch','nominalEntries=4096')
+ *  )
+ *
+ * Please refer to {@link FunnelCountAggregationFunctionFactory} to learn 
about counting strategies available.
+ *
+ * @see FunnelCountAggregationFunctionFactory
+ * @see FunnelCountSortedAggregationFunction
+ */
+public class FunnelCountAggregationFunction<A, I> implements 
AggregationFunction<I, LongArrayList> {
+  private final List<ExpressionContext> _expressions;
+  private final List<ExpressionContext> _stepExpressions;
+  private final List<ExpressionContext> _correlateByExpressions;
+  private final int _numSteps;
+
+  private final AggregationStrategy<A> _aggregationStrategy;
+  private final ResultExtractionStrategy<A, I> _resultExtractionStrategy;
+  private final MergeStrategy<I> _mergeStrategy;
+
+  public FunnelCountAggregationFunction(List<ExpressionContext> expressions, 
List<ExpressionContext> stepExpressions,
+      List<ExpressionContext> correlateByExpressions, AggregationStrategy<A> 
aggregationStrategy,
+      ResultExtractionStrategy<A, I> resultExtractionStrategy, 
MergeStrategy<I> mergeStrategy) {
+    _expressions = expressions;
+    _stepExpressions = stepExpressions;
+    _correlateByExpressions = correlateByExpressions;
+    _aggregationStrategy = aggregationStrategy;
+    _resultExtractionStrategy = resultExtractionStrategy;
+    _mergeStrategy = mergeStrategy;
+    _numSteps = _stepExpressions.size();
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return getType().getName().toLowerCase() + "(" + 
_expressions.stream().map(ExpressionContext::toString)
+        .collect(Collectors.joining(",")) + ")";
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    final List<ExpressionContext> inputs = new ArrayList<>();
+    inputs.addAll(_correlateByExpressions);
+    inputs.addAll(_stepExpressions);
+    return inputs;
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.FUNNELCOUNT;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    _aggregationStrategy.aggregate(length, aggregationResultHolder, 
blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    _aggregationStrategy.aggregateGroupBySV(length, groupKeyArray, 
groupByResultHolder, blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    _aggregationStrategy.aggregateGroupByMV(length, groupKeysArray, 
groupByResultHolder, blockValSetMap);
+  }
+
+  @Override
+  public I extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    return 
_resultExtractionStrategy.extractAggregationResult(aggregationResultHolder);
+  }
+
+  @Override
+  public I extractGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey) {
+    return _resultExtractionStrategy.extractGroupByResult(groupByResultHolder, 
groupKey);
+  }
+
+  @Override
+  public I merge(I a, I b) {
+    if (a == null) {
+      return b;
+    }
+    if (b == null) {
+      return a;
+    }
+    return _mergeStrategy.merge(a, b);
+  }
+
+  @Override
+  public LongArrayList extractFinalResult(I intermediateResult) {
+    if (intermediateResult == null) {
+      return new LongArrayList(_numSteps);
+    }
+    return _mergeStrategy.extractFinalResult(intermediateResult);
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG_ARRAY;
+  }
+
+  @Override
+  public String toExplainString() {
+    StringBuilder stringBuilder = new 
StringBuilder(getType().getName()).append('(');
+    int numArguments = getInputExpressions().size();
+    if (numArguments > 0) {
+      stringBuilder.append(getInputExpressions().get(0).toString());
+      for (int i = 1; i < numArguments; i++) {
+        stringBuilder.append(", 
").append(getInputExpressions().get(i).toString());
+      }
+    }
+    return stringBuilder.append(')').toString();
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountAggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountAggregationFunctionFactory.java
new file mode 100644
index 0000000000..dc9c14b3d4
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountAggregationFunctionFactory.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.thetacommon.ThetaUtil;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.DistinctCountAggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.DistinctCountBitmapAggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.DistinctCountThetaSketchAggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.SegmentPartitionedDistinctCountAggregationFunction;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * The {@code FunnelCountAggregationFunctionFactory} builds a {@code 
FunnelCountAggregationFunction}.
+ * Primary role is to validate inputs and select the appropriate aggregation 
strategy to use based on settings.
+ *
+ * There are 5 strategies available, mirroring the corresponding distinct 
count implementations as per below.
+ *  <p><ul>
+ *  <li>'set': See DISTINCTCOUNT at {@link DistinctCountAggregationFunction}
+ *  <li>'bitmap' (default): See DISTINCTCOUNTBITMAP at {@link 
DistinctCountBitmapAggregationFunction}
+ *  <li>'theta_sketch': See DISTINCTCOUNTTHETASKETCH at {@link 
DistinctCountThetaSketchAggregationFunction}
+ *  <li>'partitioned': See SEGMENTPARTITIONEDDISTINCTCOUNT {@link 
SegmentPartitionedDistinctCountAggregationFunction}
+ *  <li>'sorted': sorted counts per segment then sums up. Only availabe in 
combination with 'partitioned'.
+ *  <li>'nominalEntries=4096': theta sketch configuration, default is 4096.
+ *  </ul><p>
+ */
+public class FunnelCountAggregationFunctionFactory implements 
Supplier<AggregationFunction> {
+  final List<ExpressionContext> _expressions;
+  final List<ExpressionContext> _stepExpressions;
+  final List<ExpressionContext> _correlateByExpressions;
+  final ExpressionContext _primaryCorrelationCol;
+  final int _numSteps;
+  final int _nominalEntries;
+  final boolean _partitionSetting;
+  final boolean _sortingSetting;
+  final boolean _thetaSketchSetting;
+  final boolean _setSetting;
+
+  public FunnelCountAggregationFunctionFactory(List<ExpressionContext> 
expressions) {
+    _expressions = expressions;
+    Option.validate(expressions);
+    _correlateByExpressions = 
Option.CORRELATE_BY.getInputExpressions(expressions);
+    _primaryCorrelationCol = _correlateByExpressions.get(0);
+    _stepExpressions = Option.STEPS.getInputExpressions(expressions);
+    _numSteps = _stepExpressions.size();
+
+    final List<String> settings = Option.SETTINGS.getLiterals(expressions);
+    Setting.validate(settings);
+    _setSetting = Setting.SET.isSet(settings);
+    _partitionSetting = Setting.PARTITIONED.isSet(settings);
+    _sortingSetting = Setting.SORTED.isSet(settings);
+    _thetaSketchSetting = Setting.THETA_SKETCH.isSet(settings);
+    _nominalEntries = 
Setting.NOMINAL_ENTRIES.getInteger(settings).orElse(ThetaUtil.DEFAULT_NOMINAL_ENTRIES);
+  }
+
+  public AggregationFunction get() {
+    if (_partitionSetting) {
+      if (_thetaSketchSetting) {
+        // theta_sketch && partitioned
+        return 
createPartionedFunnelCountAggregationFunction(thetaSketchAggregationStrategy(),
+            thetaSketchPartitionedResultExtractionStrategy(), 
partitionedMergeStrategy());
+      } else {
+        // partitioned && !theta_sketch
+        return 
createPartionedFunnelCountAggregationFunction(bitmapAggregationStrategy(),
+            bitmapPartitionedResultExtractionStrategy(), 
partitionedMergeStrategy());
+      }
+    } else {
+      if (_thetaSketchSetting) {
+        // theta_sketch && !partitioned
+        return 
createFunnelCountAggregationFunction(thetaSketchAggregationStrategy(),
+            thetaSketchResultExtractionStrategy(), thetaSketchMergeStrategy());
+      } else if (_setSetting) {
+        // set && !partitioned && !theta_sketch
+        return 
createFunnelCountAggregationFunction(bitmapAggregationStrategy(), 
setResultExtractionStrategy(),
+            setMergeStrategy());
+      } else {
+        // default (bitmap)
+        // !partitioned && !theta_sketch && !set
+        return 
createFunnelCountAggregationFunction(bitmapAggregationStrategy(), 
bitmapResultExtractionStrategy(),
+            bitmapMergeStrategy());
+      }
+    }
+  }
+
+  private <A, I> FunnelCountAggregationFunction<A, I> 
createFunnelCountAggregationFunction(
+      AggregationStrategy<A> aggregationStrategy, ResultExtractionStrategy<A, 
I> resultExtractionStrategy,
+      MergeStrategy<I> mergeStrategy) {
+    return new FunnelCountAggregationFunction<>(_expressions, 
_stepExpressions, _correlateByExpressions,
+        aggregationStrategy, resultExtractionStrategy, mergeStrategy);
+  }
+
+  private <A> FunnelCountAggregationFunction<A, List<Long>> 
createPartionedFunnelCountAggregationFunction(
+      AggregationStrategy<A> aggregationStrategy, ResultExtractionStrategy<A, 
List<Long>> resultExtractionStrategy,
+      MergeStrategy<List<Long>> mergeStrategy) {
+    if (_sortingSetting) {
+      return new FunnelCountSortedAggregationFunction<>(_expressions, 
_stepExpressions, _correlateByExpressions,
+          aggregationStrategy, resultExtractionStrategy, mergeStrategy);
+    } else {
+      return new FunnelCountAggregationFunction<>(_expressions, 
_stepExpressions, _correlateByExpressions,
+          aggregationStrategy, resultExtractionStrategy, mergeStrategy);
+    }
+  }
+
+  AggregationStrategy<UpdateSketch[]> thetaSketchAggregationStrategy() {
+    return new ThetaSketchAggregationStrategy(_stepExpressions, 
_correlateByExpressions, _nominalEntries);
+  }
+
+  AggregationStrategy<DictIdsWrapper> bitmapAggregationStrategy() {
+    return new BitmapAggregationStrategy(_stepExpressions, 
_correlateByExpressions);
+  }
+
+  MergeStrategy<List<Sketch>> thetaSketchMergeStrategy() {
+    return new ThetaSketchMergeStrategy(_numSteps, _nominalEntries);
+  }
+
+  MergeStrategy<List<Set>> setMergeStrategy() {
+    return new SetMergeStrategy(_numSteps);
+  }
+
+  MergeStrategy<List<RoaringBitmap>> bitmapMergeStrategy() {
+    return new BitmapMergeStrategy(_numSteps);
+  }
+
+  MergeStrategy<List<Long>> partitionedMergeStrategy() {
+    return new PartitionedMergeStrategy(_numSteps);
+  }
+
+  ResultExtractionStrategy<UpdateSketch[], List<Sketch>> 
thetaSketchResultExtractionStrategy() {
+    return new ThetaSketchResultExtractionStrategy(_numSteps);
+  }
+
+  ResultExtractionStrategy<DictIdsWrapper, List<Set>> 
setResultExtractionStrategy() {
+    return new SetResultExtractionStrategy(_numSteps);
+  }
+
+  ResultExtractionStrategy<DictIdsWrapper, List<RoaringBitmap>> 
bitmapResultExtractionStrategy() {
+    return new BitmapResultExtractionStrategy(_numSteps);
+  }
+
+  ResultExtractionStrategy<DictIdsWrapper, List<Long>> 
bitmapPartitionedResultExtractionStrategy() {
+    final MergeStrategy<List<RoaringBitmap>> bitmapMergeStrategy = 
bitmapMergeStrategy();
+    return dictIdsWrapper -> 
bitmapMergeStrategy.extractFinalResult(Arrays.asList(dictIdsWrapper._stepsBitmaps));
+  }
+
+  ResultExtractionStrategy<UpdateSketch[], List<Long>> 
thetaSketchPartitionedResultExtractionStrategy() {
+    final MergeStrategy<List<Sketch>> thetaSketchMergeStrategy = 
thetaSketchMergeStrategy();
+    return sketches -> 
thetaSketchMergeStrategy.extractFinalResult(Arrays.asList(sketches));
+  }
+
+  enum Option {
+    STEPS("steps"), CORRELATE_BY("correlateby"), SETTINGS("settings");
+
+    final String _name;
+
+    Option(String name) {
+      _name = name;
+    }
+
+    public static void validate(List<ExpressionContext> expressions) {
+      final List<String> invalidOptions = expressions.stream()
+          .filter(expression -> 
!Arrays.stream(Option.values()).anyMatch(option -> option.matches(expression)))
+          .map(ExpressionContext::toString).collect(Collectors.toList());
+
+      if (!invalidOptions.isEmpty()) {
+        throw new IllegalArgumentException("Invalid FUNNELCOUNT options: " + 
String.join(", ", invalidOptions));
+      }
+    }
+
+    boolean matches(ExpressionContext expression) {
+      if (expression.getType() != ExpressionContext.Type.FUNCTION) {
+        return false;
+      }
+      return _name.equals(expression.getFunction().getFunctionName());
+    }
+
+    Optional<ExpressionContext> find(List<ExpressionContext> expressions) {
+      return expressions.stream().filter(this::matches).findFirst();
+    }
+
+    public List<ExpressionContext> getInputExpressions(List<ExpressionContext> 
expressions) {
+      final List<ExpressionContext> inputExpressions =
+          this.find(expressions).map(exp -> exp.getFunction().getArguments())
+              .orElseThrow(() -> new IllegalArgumentException("FUNNELCOUNT 
requires " + _name));
+      Preconditions.checkArgument(!inputExpressions.isEmpty(), "FUNNELCOUNT: " 
+ _name + " requires an argument.");
+      return inputExpressions;
+    }
+
+    public List<String> getLiterals(List<ExpressionContext> expressions) {
+      List<ExpressionContext> inputExpressions =
+          find(expressions).map(exp -> 
exp.getFunction().getArguments()).orElseGet(Collections::emptyList);
+      Preconditions.checkArgument(
+          inputExpressions.stream().allMatch(exp -> exp.getType() == 
ExpressionContext.Type.LITERAL),
+          "FUNNELCOUNT: " + _name + " parameters must be literals");
+      return inputExpressions.stream().map(exp -> 
exp.getLiteral().getStringValue()).collect(Collectors.toList());
+    }
+  }
+
+  enum Setting {
+    SET("set"),
+    BITMAP("bitmap"),
+    PARTITIONED("partitioned"),
+    SORTED("sorted"),
+    THETA_SKETCH("theta_sketch"),
+    NOMINAL_ENTRIES("nominalEntries");
+
+    private static final char KEY_VALUE_SEPARATOR = '=';
+    final String _name;
+
+    Setting(String name) {
+      _name = name.toLowerCase();
+    }
+
+    public static void validate(List<String> settings) {
+      final List<String> invalidSettings = settings.stream().filter(param -> 
!Arrays.stream(Setting.values())
+          .anyMatch(setting -> setting.matchesKV(param) || 
setting.matches(param))).collect(Collectors.toList());
+
+      if (!invalidSettings.isEmpty()) {
+        throw new IllegalArgumentException("Invalid FUNNELCOUNT SETTINGS: " + 
String.join(", ", invalidSettings));
+      }
+    }
+
+    boolean matchesKV(String setting) {
+      return 
StringUtils.deleteWhitespace(setting).toLowerCase().startsWith(_name + 
KEY_VALUE_SEPARATOR);
+    }
+
+    boolean matches(String setting) {
+      return StringUtils.deleteWhitespace(setting).toLowerCase().equals(_name);
+    }
+
+    public Optional<String> getString(List<String> settings) {
+      return settings.stream().filter(this::matchesKV).findFirst()
+          .map(setting -> setting.substring(_name.length() + 1));
+    }
+
+    public Optional<Integer> getInteger(List<String> settings) {
+      return getString(settings).map(Integer::parseInt);
+    }
+
+    public boolean isSet(List<String> settings) {
+      return settings.stream().anyMatch(this::matches) || 
getString(settings).map(Boolean::parseBoolean).orElse(false);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountSortedAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountSortedAggregationFunction.java
new file mode 100644
index 0000000000..ac39461cef
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelCountSortedAggregationFunction.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+
+
+/**
+ * The {@code FunnelCountSortedAggregationFunction} calculates the number of 
conversions for a given correlation column
+ * and a list of steps as boolean expressions.
+ * It leverages a more efficient counting strategy for segments sorted by 
correlate_by column, falls back to a regular
+ * counting strategy for unsorted segments (e.g. uncommitted segments).
+ *
+ * Example:
+ *   SELECT
+ *    dateTrunc('day', timestamp) AS ts,
+ *    FUNNEL_COUNT(
+ *      STEPS(url = '/addToCart', url = '/checkout', url = 
'/orderConfirmation'),
+ *      CORRELATE_BY(user_id),
+ *      SETTINGS('partitioned','sorted')
+ *    ) as step_counts
+ *    FROM user_log
+ *    WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
+ *    GROUP BY 1
+ *
+ */
+public class FunnelCountSortedAggregationFunction<A> extends 
FunnelCountAggregationFunction<A, List<Long>> {
+  private final ExpressionContext _primaryCorrelationCol;
+  private final AggregationStrategy<SortedAggregationResult> 
_sortedAggregationStrategy;
+  private final ResultExtractionStrategy<SortedAggregationResult, List<Long>> 
_sortedResultExtractionStrategy;
+
+  public FunnelCountSortedAggregationFunction(List<ExpressionContext> 
expressions,
+      List<ExpressionContext> stepExpressions, List<ExpressionContext> 
correlateByExpressions,
+      AggregationStrategy<A> aggregationStrategy, ResultExtractionStrategy<A, 
List<Long>> resultExtractionStrategy,
+      MergeStrategy<List<Long>> mergeStrategy) {
+    super(expressions, stepExpressions, correlateByExpressions, 
aggregationStrategy, resultExtractionStrategy,
+        mergeStrategy);
+    _sortedAggregationStrategy = new 
SortedAggregationStrategy(stepExpressions, correlateByExpressions);
+    _sortedResultExtractionStrategy = SortedAggregationResult::extractResult;;
+    _primaryCorrelationCol = correlateByExpressions.get(0);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    if (isSortedDictionary(blockValSetMap)) {
+      _sortedAggregationStrategy.aggregate(length, aggregationResultHolder, 
blockValSetMap);
+    } else {
+      super.aggregate(length, aggregationResultHolder, blockValSetMap);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    if (isSortedDictionary(blockValSetMap)) {
+      _sortedAggregationStrategy.aggregateGroupBySV(length, groupKeyArray, 
groupByResultHolder, blockValSetMap);
+    } else {
+      super.aggregateGroupBySV(length, groupKeyArray, groupByResultHolder, 
blockValSetMap);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    if (isSortedDictionary(blockValSetMap)) {
+      _sortedAggregationStrategy.aggregateGroupByMV(length, groupKeysArray, 
groupByResultHolder, blockValSetMap);
+    } else {
+      super.aggregateGroupByMV(length, groupKeysArray, groupByResultHolder, 
blockValSetMap);
+    }
+  }
+
+  @Override
+  public List<Long> extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    if (isSortedAggResult(aggregationResultHolder.getResult())) {
+      return 
_sortedResultExtractionStrategy.extractAggregationResult(aggregationResultHolder);
+    } else {
+      return super.extractAggregationResult(aggregationResultHolder);
+    }
+  }
+
+  @Override
+  public List<Long> extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    if (isSortedAggResult(groupByResultHolder.getResult(groupKey))) {
+      return 
_sortedResultExtractionStrategy.extractGroupByResult(groupByResultHolder, 
groupKey);
+    } else {
+      return super.extractGroupByResult(groupByResultHolder, groupKey);
+    }
+  }
+
+  private boolean isSortedDictionary(Map<ExpressionContext, BlockValSet> 
blockValSetMap) {
+    return getDictionary(blockValSetMap).isSorted();
+  }
+
+  private boolean isSortedAggResult(Object aggResult) {
+    return aggResult instanceof SortedAggregationResult;
+  }
+
+  private Dictionary getDictionary(Map<ExpressionContext, BlockValSet> 
blockValSetMap) {
+    final Dictionary primaryCorrelationDictionary = 
blockValSetMap.get(_primaryCorrelationCol).getDictionary();
+    Preconditions.checkArgument(primaryCorrelationDictionary != null,
+        "CORRELATE_BY column in FUNNELCOUNT aggregation function not supported 
for sorted setting, "
+            + "please use a dictionary encoded column.");
+    return primaryCorrelationDictionary;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/MergeStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/MergeStrategy.java
new file mode 100644
index 0000000000..47eaa57e6f
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/MergeStrategy.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import javax.annotation.concurrent.ThreadSafe;
+
+
+/**
+ * Interface for cross-segment merge strategy.
+ *
+ * <p>The implementation should be stateless, and can be shared among multiple 
segments in multiple threads.
+ *
+ * @param <I> Intermediate result at segment level (extracted from aggregation 
strategy result).
+ */
+@ThreadSafe
+interface MergeStrategy<I> {
+  I merge(I intermediateResult1, I intermediateResult2);
+
+  LongArrayList extractFinalResult(I intermediateResult);
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/PartitionedMergeStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/PartitionedMergeStrategy.java
new file mode 100644
index 0000000000..15ea02e090
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/PartitionedMergeStrategy.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.util.List;
+
+
+class PartitionedMergeStrategy implements MergeStrategy<List<Long>> {
+  protected final int _numSteps;
+
+  PartitionedMergeStrategy(int numSteps) {
+    _numSteps = numSteps;
+  }
+
+  @Override
+  public List<Long> merge(List<Long> a, List<Long> b) {
+    LongArrayList result = toLongArrayList(a);
+    long[] elements = result.elements();
+    for (int i = 0; i < _numSteps; i++) {
+      elements[i] += b.get(i);
+    }
+    return result;
+  }
+
+  @Override
+  public LongArrayList extractFinalResult(List<Long> intermediateResult) {
+    return toLongArrayList(intermediateResult);
+  }
+
+  private LongArrayList toLongArrayList(List<Long> longList) {
+    return longList instanceof LongArrayList ? ((LongArrayList) 
longList).clone() : new LongArrayList(longList);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ResultExtractionStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ResultExtractionStrategy.java
new file mode 100644
index 0000000000..7c35251640
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ResultExtractionStrategy.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+
+
+/**
+ * Interface for segment aggregation result extraction strategy.
+ *
+ * <p>The implementation should be stateless, and can be shared among multiple 
segments in multiple threads.
+ *
+ * @param <A> Aggregation result accumulated across blocks within segment, 
kept by result holder.
+ * @param <I> Intermediate result at segment level (extracted from 
aforementioned aggregation result).
+ */
+@ThreadSafe
+interface ResultExtractionStrategy<A, I> {
+
+  /**
+   * Extracts the intermediate result from the aggregation result holder 
(aggregation only).
+   */
+  default I extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    return extractIntermediateResult(aggregationResultHolder.getResult());
+  }
+
+  /**
+   * Extracts the intermediate result from the group-by result holder for the 
given group key (aggregation group-by).
+   */
+  default I extractGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey) {
+    return extractIntermediateResult(groupByResultHolder.getResult(groupKey));
+  }
+
+  I extractIntermediateResult(A aggregationResult);
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SetMergeStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SetMergeStrategy.java
new file mode 100644
index 0000000000..d5ae4bc987
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SetMergeStrategy.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.util.List;
+import java.util.Set;
+
+
+class SetMergeStrategy implements MergeStrategy<List<Set>> {
+  protected final int _numSteps;
+
+  SetMergeStrategy(int numSteps) {
+    _numSteps = numSteps;
+  }
+
+  @Override
+  public List<Set> merge(List<Set> intermediateResult1, List<Set> 
intermediateResult2) {
+    for (int i = 0; i < _numSteps; i++) {
+      intermediateResult1.get(i).addAll(intermediateResult2.get(i));
+    }
+    return intermediateResult1;
+  }
+
+  @Override
+  public LongArrayList extractFinalResult(List<Set> stepsSets) {
+    long[] result = new long[_numSteps];
+    result[0] = stepsSets.get(0).size();
+    for (int i = 1; i < _numSteps; i++) {
+      // intersect this step with previous step
+      stepsSets.get(i).retainAll(stepsSets.get(i - 1));
+      result[i] = stepsSets.get(i).size();
+    }
+    return LongArrayList.wrap(result);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SetResultExtractionStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SetResultExtractionStrategy.java
new file mode 100644
index 0000000000..09ad7adad8
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SetResultExtractionStrategy.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * Aggregation strategy leveraging set algebra (unions/intersections).
+ */
+class SetResultExtractionStrategy implements 
ResultExtractionStrategy<DictIdsWrapper, List<Set>> {
+  protected final int _numSteps;
+
+  SetResultExtractionStrategy(int numSteps) {
+    _numSteps = numSteps;
+  }
+
+  @Override
+  public List<Set> extractIntermediateResult(DictIdsWrapper dictIdsWrapper) {
+    Dictionary dictionary = dictIdsWrapper._dictionary;
+    List<Set> result = new ArrayList<>(_numSteps);
+    for (RoaringBitmap dictIdBitmap : dictIdsWrapper._stepsBitmaps) {
+      result.add(convertToValueSet(dictionary, dictIdBitmap));
+    }
+    return result;
+  }
+
+  private Set convertToValueSet(Dictionary dictionary, RoaringBitmap 
dictIdBitmap) {
+    int numValues = dictIdBitmap.getCardinality();
+    PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
+    FieldSpec.DataType storedType = dictionary.getValueType();
+    switch (storedType) {
+      case INT:
+        IntOpenHashSet intSet = new IntOpenHashSet(numValues);
+        while (iterator.hasNext()) {
+          intSet.add(dictionary.getIntValue(iterator.next()));
+        }
+        return intSet;
+      case LONG:
+        LongOpenHashSet longSet = new LongOpenHashSet(numValues);
+        while (iterator.hasNext()) {
+          longSet.add(dictionary.getLongValue(iterator.next()));
+        }
+        return longSet;
+      case FLOAT:
+        FloatOpenHashSet floatSet = new FloatOpenHashSet(numValues);
+        while (iterator.hasNext()) {
+          floatSet.add(dictionary.getFloatValue(iterator.next()));
+        }
+        return floatSet;
+      case DOUBLE:
+        DoubleOpenHashSet doubleSet = new DoubleOpenHashSet(numValues);
+        while (iterator.hasNext()) {
+          doubleSet.add(dictionary.getDoubleValue(iterator.next()));
+        }
+        return doubleSet;
+      case STRING:
+        ObjectOpenHashSet<String> stringSet = new 
ObjectOpenHashSet<>(numValues);
+        while (iterator.hasNext()) {
+          stringSet.add(dictionary.getStringValue(iterator.next()));
+        }
+        return stringSet;
+      default:
+        throw new IllegalArgumentException("Illegal data type for FUNNEL_COUNT 
aggregation function: " + storedType);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationResult.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationResult.java
new file mode 100644
index 0000000000..eb773eac7e
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationResult.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+
+
+/**
+ * Aggregation result data structure leveraged by sorted aggregation strategy.
+ */
+class SortedAggregationResult {
+  final int _numSteps;
+  final long[] _stepCounters;
+  final boolean[] _correlatedSteps;
+  int _lastCorrelationId = Integer.MIN_VALUE;
+
+  SortedAggregationResult(int numSteps) {
+    _numSteps = numSteps;
+    _stepCounters = new long[_numSteps];
+    _correlatedSteps = new boolean[_numSteps];
+  }
+
+  public void add(int step, int correlationId) {
+    if (correlationId != _lastCorrelationId) {
+      // End of correlation group, calculate funnel conversion counts
+      incrStepCounters();
+
+      // initialize next correlation group
+      for (int n = 0; n < _numSteps; n++) {
+        _correlatedSteps[n] = false;
+      }
+      _lastCorrelationId = correlationId;
+    }
+    _correlatedSteps[step] = true;
+  }
+
+  void incrStepCounters() {
+    for (int n = 0; n < _numSteps; n++) {
+      if (!_correlatedSteps[n]) {
+        break;
+      }
+      _stepCounters[n]++;
+    }
+  }
+
+  public LongArrayList extractResult() {
+    // count last correlation id left open
+    incrStepCounters();
+    return LongArrayList.wrap(_stepCounters);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationStrategy.java
new file mode 100644
index 0000000000..533d8723a7
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationStrategy.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+
+
+/**
+ * Aggregation strategy for segments partitioned and sorted by the main 
correlation column.
+ */
+class SortedAggregationStrategy extends 
AggregationStrategy<SortedAggregationResult> {
+  public SortedAggregationStrategy(List<ExpressionContext> stepExpressions,
+      List<ExpressionContext> correlateByExpressions) {
+    super(stepExpressions, correlateByExpressions);
+  }
+
+  @Override
+  public SortedAggregationResult createAggregationResult(Dictionary 
dictionary) {
+    return new SortedAggregationResult(_numSteps);
+  }
+
+  @Override
+  void add(Dictionary dictionary, SortedAggregationResult aggResult, int step, 
int correlationId) {
+    aggResult.add(step, correlationId);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchAggregationStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchAggregationStrategy.java
new file mode 100644
index 0000000000..a2ac25f867
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchAggregationStrategy.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import java.util.List;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+
+
+/**
+ * Aggregation strategy leveraging theta sketch algebra (unions/intersections).
+ */
+class ThetaSketchAggregationStrategy extends 
AggregationStrategy<UpdateSketch[]> {
+  final UpdateSketchBuilder _updateSketchBuilder;
+
+  public ThetaSketchAggregationStrategy(List<ExpressionContext> 
stepExpressions,
+      List<ExpressionContext> correlateByExpressions, int nominalEntries) {
+    super(stepExpressions, correlateByExpressions);
+    _updateSketchBuilder = new 
UpdateSketchBuilder().setNominalEntries(nominalEntries);
+  }
+
+  @Override
+  public UpdateSketch[] createAggregationResult(Dictionary dictionary) {
+    final UpdateSketch[] stepsSketches = new UpdateSketch[_numSteps];
+    for (int n = 0; n < _numSteps; n++) {
+      stepsSketches[n] = _updateSketchBuilder.build();
+    }
+    return stepsSketches;
+  }
+
+  @Override
+  void add(Dictionary dictionary, UpdateSketch[] stepsSketches, int step, int 
correlationId) {
+    final UpdateSketch sketch = stepsSketches[step];
+    switch (dictionary.getValueType()) {
+      case INT:
+        sketch.update(dictionary.getIntValue(correlationId));
+        break;
+      case LONG:
+        sketch.update(dictionary.getLongValue(correlationId));
+        break;
+      case FLOAT:
+        sketch.update(dictionary.getFloatValue(correlationId));
+        break;
+      case DOUBLE:
+        sketch.update(dictionary.getDoubleValue(correlationId));
+        break;
+      case STRING:
+        sketch.update(dictionary.getStringValue(correlationId));
+        break;
+      default:
+        throw new IllegalStateException("Illegal CORRELATED_BY column data 
type for FUNNEL_COUNT aggregation function: "
+            + dictionary.getValueType());
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchMergeStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchMergeStrategy.java
new file mode 100644
index 0000000000..80378d7078
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchMergeStrategy.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.datasketches.theta.Intersection;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+
+
+class ThetaSketchMergeStrategy implements MergeStrategy<List<Sketch>> {
+  protected final int _numSteps;
+  final SetOperationBuilder _setOperationBuilder;
+
+  ThetaSketchMergeStrategy(int numSteps, int nominalEntries) {
+    _numSteps = numSteps;
+    _setOperationBuilder = new 
SetOperationBuilder().setNominalEntries(nominalEntries);
+  }
+
+  @Override
+  public List<Sketch> merge(List<Sketch> sketches1, List<Sketch> sketches2) {
+    final List<Sketch> mergedSketches = new ArrayList<>(_numSteps);
+    for (int i = 0; i < _numSteps; i++) {
+      // NOTE: Compact the sketch in unsorted, on-heap fashion for performance 
concern.
+      //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html 
for more details.
+      
mergedSketches.add(_setOperationBuilder.buildUnion().union(sketches1.get(i), 
sketches2.get(i), false, null));
+    }
+    return mergedSketches;
+  }
+
+  @Override
+  public LongArrayList extractFinalResult(List<Sketch> sketches) {
+    long[] result = new long[_numSteps];
+
+    Sketch sketch = sketches.get(0);
+    result[0] = Math.round(sketch.getEstimate());
+    for (int i = 1; i < _numSteps; i++) {
+      Intersection intersection = _setOperationBuilder.buildIntersection();
+      sketch = intersection.intersect(sketch, sketches.get(i));
+      result[i] = Math.round(sketch.getEstimate());
+    }
+    return LongArrayList.wrap(result);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchResultExtractionStrategy.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchResultExtractionStrategy.java
new file mode 100644
index 0000000000..2c794271bf
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/ThetaSketchResultExtractionStrategy.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
+
+
+class ThetaSketchResultExtractionStrategy implements 
ResultExtractionStrategy<UpdateSketch[], List<Sketch>> {
+  private static final Sketch EMPTY_SKETCH = new 
UpdateSketchBuilder().build().compact();
+
+  protected final int _numSteps;
+
+  ThetaSketchResultExtractionStrategy(int numSteps) {
+    _numSteps = numSteps;
+  }
+
+  @Override
+  public List<Sketch> extractIntermediateResult(UpdateSketch[] stepsSketches) {
+    if (stepsSketches == null) {
+      return Collections.nCopies(_numSteps, EMPTY_SKETCH);
+    }
+    return Arrays.asList(stepsSketches);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseFunnelCountQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseFunnelCountQueriesTest.java
index ef5c7d596f..c1a966a9b5 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseFunnelCountQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseFunnelCountQueriesTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.queries;
 
-import it.unimi.dsi.fastutil.longs.LongArrayList;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -87,8 +86,12 @@ abstract public class BaseFunnelCountQueriesTest extends 
BaseQueriesTest {
   private List<IndexSegment> _indexSegments;
 
   protected abstract int getExpectedNumEntriesScannedInFilter();
+  protected abstract int getExpectedInterSegmentMultiplier();
   protected abstract TableConfig getTableConfig();
   protected abstract IndexSegment buildSegment(List<GenericRow> records) 
throws Exception;
+  protected abstract void assertIntermediateResult(Object intermediateResult, 
long[] expectedCounts);
+
+  protected abstract String getSettings();
 
   @Override
   protected String getFilter() {
@@ -132,13 +135,16 @@ abstract public class BaseFunnelCountQueriesTest extends 
BaseQueriesTest {
     return records;
   }
 
+  private String getFunnelCountSql() {
+    return "FUNNEL_COUNT( "
+        + "STEPS(stepColumn = 'A', stepColumn = 'B'), "
+        + "CORRELATE_BY(idColumn), "
+        + getSettings()
+        + ") ";
+  }
   @Test
   public void testAggregationOnly() {
-    String query = String.format("SELECT "
-        + "FUNNEL_COUNT("
-        + " STEPS(stepColumn = 'A', stepColumn = 'B'),"
-        + " CORRELATE_BY(idColumn)"
-        + ") FROM testTable");
+    String query = String.format("SELECT " + getFunnelCountSql() + "FROM 
testTable");
 
     // Inner segment
     Predicate<Integer> filter = id -> id >= FILTER_LIMIT;
@@ -157,13 +163,11 @@ abstract public class BaseFunnelCountQueriesTest extends 
BaseQueriesTest {
     List<Object> aggregationResult = resultsBlock.getResults();
     assertNotNull(aggregationResult);
     assertEquals(aggregationResult.size(), 1);
-    for (int i = 0; i < 2; i++) {
-      assertEquals(((LongArrayList) aggregationResult.get(0)).getLong(i), 
expectedResult[i]);
-    }
+    assertIntermediateResult(aggregationResult.get(0), expectedResult);
 
-    // Inter segments (expect 4 * inner segment result)
+    // Inter segments
     for (int i = 0; i < 2; i++) {
-      expectedResult[i] = 4 * expectedResult[i];
+      expectedResult[i] = expectedResult[i] * 
getExpectedInterSegmentMultiplier();
     }
     Object[] expectedResults = { expectedResult };
 
@@ -176,10 +180,8 @@ abstract public class BaseFunnelCountQueriesTest extends 
BaseQueriesTest {
   public void testAggregationGroupBy() {
     String query = String.format("SELECT "
         + "MOD(idColumn, %s), "
-        + "FUNNEL_COUNT("
-        + " STEPS(stepColumn = 'A', stepColumn = 'B'),"
-        + " CORRELATE_BY(idColumn)"
-        + ") FROM testTable "
+        + getFunnelCountSql()
+        + "FROM testTable "
         + "WHERE idColumn >= %s "
         + "GROUP BY 1 ORDER BY 1 LIMIT %s", NUM_GROUPS, FILTER_LIMIT, 
NUM_GROUPS);
 
@@ -219,19 +221,20 @@ abstract public class BaseFunnelCountQueriesTest extends 
BaseQueriesTest {
       numGroups++;
       GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
       int key = ((Double) groupKey._keys[0]).intValue();
-      assertEquals(aggregationGroupByResult.getResultForGroupId(0, 
groupKey._groupId),
-          new LongArrayList(expectedResult[key]));
+      assertIntermediateResult(
+            aggregationGroupByResult.getResultForGroupId(0, groupKey._groupId),
+            expectedResult[key]);
     }
     assertEquals(numGroups, expectedNumGroups);
 
-    // Inter segments (expect 4 * inner segment result)
+    // Inter segments
     List<Object[]> expectedRows = new ArrayList<>();
     for (int i = 0; i < NUM_GROUPS; i++) {
       if (expectedResult[i] == null) {
         continue;
       }
       for (int step = 0; step < 2; step++) {
-          expectedResult[i][step] = 4 * expectedResult[i][step];
+          expectedResult[i][step] = expectedResult[i][step] * 
getExpectedInterSegmentMultiplier();
       }
       Object[] expectedRow = { Double.valueOf(i), expectedResult[i] };
       expectedRows.add(expectedRow);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesBitmapTest.java
similarity index 58%
copy from 
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
copy to 
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesBitmapTest.java
index c89a5d74c9..1e36838f3b 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesBitmapTest.java
@@ -25,19 +25,33 @@ import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.roaringbitmap.RoaringBitmap;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 
 /**
  * Queries test for FUNNEL_COUNT queries.
  */
 @SuppressWarnings("rawtypes")
-public class FunnelCountQueriesNonSortedTest extends 
BaseFunnelCountQueriesTest {
+public class FunnelCountQueriesBitmapTest extends BaseFunnelCountQueriesTest {
+
+  @Override
+  protected String getSettings() {
+    return "SETTINGS('bitmap')";
+  }
 
   @Override
   protected int getExpectedNumEntriesScannedInFilter() {
     return NUM_RECORDS;
   }
 
+  @Override
+  protected int getExpectedInterSegmentMultiplier() {
+    return 1;
+  }
+
   @Override
   protected TableConfig getTableConfig() {
     return TABLE_CONFIG_BUILDER.build();
@@ -46,12 +60,24 @@ public class FunnelCountQueriesNonSortedTest extends 
BaseFunnelCountQueriesTest
   @Override
   protected IndexSegment buildSegment(List<GenericRow> records)
       throws Exception {
-    MutableSegment mutableSegment = MutableSegmentImplTestUtils
-        .createMutableSegmentImpl(SCHEMA, Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet(),
-            false);
+    MutableSegment mutableSegment =
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA, 
Collections.emptySet(), Collections.emptySet(),
+            Collections.emptySet(), false);
     for (GenericRow record : records) {
       mutableSegment.index(record, null);
     }
     return mutableSegment;
   }
+
+  @Override
+  protected void assertIntermediateResult(Object intermediateResult, long[] 
expectedCounts) {
+    assertTrue(intermediateResult instanceof List);
+    List<RoaringBitmap> bitmaps = (List<RoaringBitmap>) intermediateResult;
+    // First step should match
+    assertEquals(Math.round(bitmaps.get(0).getCardinality()), 
expectedCounts[0]);
+    for (int i = 1; i < bitmaps.size(); i++) {
+      // Sets are yet to be intersected, we check that they are at least the 
size of the expected counts at this stage.
+      assertTrue(Math.round(bitmaps.get(i).getCardinality()) >= 
expectedCounts[i]);
+    }
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesSortedTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesPartitionedSortedTest.java
similarity index 79%
rename from 
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesSortedTest.java
rename to 
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesPartitionedSortedTest.java
index f06fe26637..a453886050 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesSortedTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesPartitionedSortedTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.queries;
 
+import it.unimi.dsi.fastutil.longs.LongArrayList;
 import java.io.File;
 import java.util.Comparator;
 import java.util.List;
@@ -30,18 +31,31 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.utils.ReadMode;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 
 /**
  * Queries test for FUNNEL_COUNT queries using sorted strategy.
  */
 @SuppressWarnings("rawtypes")
-public class FunnelCountQueriesSortedTest extends BaseFunnelCountQueriesTest {
+public class FunnelCountQueriesPartitionedSortedTest extends 
BaseFunnelCountQueriesTest {
+
+  @Override
+  protected String getSettings() {
+    return "SETTINGS('partitioned', 'sorted')";
+  }
 
   @Override
   protected int getExpectedNumEntriesScannedInFilter() {
     return 0;
   }
 
+  @Override
+  protected int getExpectedInterSegmentMultiplier() {
+    return 4;
+  }
+
   @Override
   protected TableConfig getTableConfig() {
     return TABLE_CONFIG_BUILDER.setSortedColumn(ID_COLUMN).build();
@@ -62,4 +76,10 @@ public class FunnelCountQueriesSortedTest extends 
BaseFunnelCountQueriesTest {
     driver.build();
     return ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), 
ReadMode.mmap);
   }
+
+  @Override
+  protected void assertIntermediateResult(Object intermediateResult, long[] 
expectedCounts) {
+    assertTrue(intermediateResult instanceof LongArrayList);
+    assertEquals(((LongArrayList) intermediateResult).elements(), 
expectedCounts);
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesPartitionedTest.java
similarity index 66%
copy from 
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
copy to 
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesPartitionedTest.java
index c89a5d74c9..da3f436b58 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesPartitionedTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.queries;
 
+import it.unimi.dsi.fastutil.longs.LongArrayList;
 import java.util.Collections;
 import java.util.List;
 import 
org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImplTestUtils;
@@ -26,18 +27,31 @@ import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 
 /**
  * Queries test for FUNNEL_COUNT queries.
  */
 @SuppressWarnings("rawtypes")
-public class FunnelCountQueriesNonSortedTest extends 
BaseFunnelCountQueriesTest {
+public class FunnelCountQueriesPartitionedTest extends 
BaseFunnelCountQueriesTest {
+
+  @Override
+  protected String getSettings() {
+    return "SETTINGS('partitioned')";
+  }
 
   @Override
   protected int getExpectedNumEntriesScannedInFilter() {
     return NUM_RECORDS;
   }
 
+  @Override
+  protected int getExpectedInterSegmentMultiplier() {
+    return 4;
+  }
+
   @Override
   protected TableConfig getTableConfig() {
     return TABLE_CONFIG_BUILDER.build();
@@ -46,12 +60,18 @@ public class FunnelCountQueriesNonSortedTest extends 
BaseFunnelCountQueriesTest
   @Override
   protected IndexSegment buildSegment(List<GenericRow> records)
       throws Exception {
-    MutableSegment mutableSegment = MutableSegmentImplTestUtils
-        .createMutableSegmentImpl(SCHEMA, Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet(),
-            false);
+    MutableSegment mutableSegment =
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA, 
Collections.emptySet(), Collections.emptySet(),
+            Collections.emptySet(), false);
     for (GenericRow record : records) {
       mutableSegment.index(record, null);
     }
     return mutableSegment;
   }
+
+  @Override
+  protected void assertIntermediateResult(Object intermediateResult, long[] 
expectedCounts) {
+    assertTrue(intermediateResult instanceof LongArrayList);
+    assertEquals(((LongArrayList) intermediateResult).elements(), 
expectedCounts);
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesSetTest.java
similarity index 60%
copy from 
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
copy to 
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesSetTest.java
index c89a5d74c9..b9361bc618 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesSetTest.java
@@ -20,24 +20,38 @@ package org.apache.pinot.queries;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import 
org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImplTestUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 
 /**
  * Queries test for FUNNEL_COUNT queries.
  */
 @SuppressWarnings("rawtypes")
-public class FunnelCountQueriesNonSortedTest extends 
BaseFunnelCountQueriesTest {
+public class FunnelCountQueriesSetTest extends BaseFunnelCountQueriesTest {
+
+  @Override
+  protected String getSettings() {
+    return "SETTINGS('set')";
+  }
 
   @Override
   protected int getExpectedNumEntriesScannedInFilter() {
     return NUM_RECORDS;
   }
 
+  @Override
+  protected int getExpectedInterSegmentMultiplier() {
+    return 1;
+  }
+
   @Override
   protected TableConfig getTableConfig() {
     return TABLE_CONFIG_BUILDER.build();
@@ -46,12 +60,24 @@ public class FunnelCountQueriesNonSortedTest extends 
BaseFunnelCountQueriesTest
   @Override
   protected IndexSegment buildSegment(List<GenericRow> records)
       throws Exception {
-    MutableSegment mutableSegment = MutableSegmentImplTestUtils
-        .createMutableSegmentImpl(SCHEMA, Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet(),
-            false);
+    MutableSegment mutableSegment =
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA, 
Collections.emptySet(), Collections.emptySet(),
+            Collections.emptySet(), false);
     for (GenericRow record : records) {
       mutableSegment.index(record, null);
     }
     return mutableSegment;
   }
+
+  @Override
+  protected void assertIntermediateResult(Object intermediateResult, long[] 
expectedCounts) {
+    assertTrue(intermediateResult instanceof List);
+    List<Set> sets = (List<Set>) intermediateResult;
+    // First step should match
+    assertEquals(Math.round(sets.get(0).size()), expectedCounts[0]);
+    for (int i = 1; i < sets.size(); i++) {
+      // Sets are yet to be intersected, we check that they are at least the 
size of the expected counts at this stage.
+      assertTrue(Math.round(sets.get(i).size()) >= expectedCounts[i]);
+    }
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesThetaSketchTest.java
similarity index 58%
rename from 
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
rename to 
pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesThetaSketchTest.java
index c89a5d74c9..ef55153280 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesNonSortedTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/FunnelCountQueriesThetaSketchTest.java
@@ -20,24 +20,33 @@ package org.apache.pinot.queries;
 
 import java.util.Collections;
 import java.util.List;
+import org.apache.datasketches.theta.Sketch;
 import 
org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImplTestUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 
 /**
  * Queries test for FUNNEL_COUNT queries.
  */
 @SuppressWarnings("rawtypes")
-public class FunnelCountQueriesNonSortedTest extends 
BaseFunnelCountQueriesTest {
+public class FunnelCountQueriesThetaSketchTest extends 
BaseFunnelCountQueriesTest {
 
   @Override
   protected int getExpectedNumEntriesScannedInFilter() {
     return NUM_RECORDS;
   }
 
+  @Override
+  protected int getExpectedInterSegmentMultiplier() {
+    return 1;
+  }
+
   @Override
   protected TableConfig getTableConfig() {
     return TABLE_CONFIG_BUILDER.build();
@@ -46,12 +55,29 @@ public class FunnelCountQueriesNonSortedTest extends 
BaseFunnelCountQueriesTest
   @Override
   protected IndexSegment buildSegment(List<GenericRow> records)
       throws Exception {
-    MutableSegment mutableSegment = MutableSegmentImplTestUtils
-        .createMutableSegmentImpl(SCHEMA, Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet(),
-            false);
+    MutableSegment mutableSegment =
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA, 
Collections.emptySet(), Collections.emptySet(),
+            Collections.emptySet(), false);
     for (GenericRow record : records) {
       mutableSegment.index(record, null);
     }
     return mutableSegment;
   }
+
+  @Override
+  protected void assertIntermediateResult(Object intermediateResult, long[] 
expectedCounts) {
+    assertTrue(intermediateResult instanceof List);
+    List<Sketch> sketches = (List<Sketch>) intermediateResult;
+    // First step should match
+    assertEquals(Math.round(sketches.get(0).getEstimate()), expectedCounts[0]);
+    for (int i = 1; i < sketches.size(); i++) {
+      // Sets are yet to be intersected, we check that they are at least the 
size of the expected counts at this stage.
+      assertTrue(Math.round(sketches.get(i).getEstimate()) >= 
expectedCounts[i]);
+    }
+  }
+
+  @Override
+  protected String getSettings() {
+    return "SETTINGS('theta_sketch')";
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to