This is an automated email from the ASF dual-hosted git repository. mayanks pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 17a3873 Improve performance of DistinctCountThetaSketch by eliminating empty sketches and unions. (#5798) 17a3873 is described below commit 17a38733c600d8c75b440e6ea3280903c7d9e9ec Author: Mayank Shrivastava <maya...@apache.org> AuthorDate: Tue Aug 4 14:04:52 2020 -0700 Improve performance of DistinctCountThetaSketch by eliminating empty sketches and unions. (#5798) * In a case with large number of predicates in the post-aggregation-expression (with OR's), we tend to end up with a lot of empty sketches (and unions) when not every row matches each predicate. This causes an overhead of creating sketches and union'ing them, leading to potentially huge performance hit. * In this PR, we improve this behavior by: - Filtering out empty unions/sketchs when extracting aggregation results. - Careful merging of results in `merge()` with mininmal unions (only when necessary). * We could also perform lazy creation of unions (to ensure that they are not empty), but that would mean a hash-map lookup per row. This will penalize the general case when there's less number of emtpy unions. So this approach was not taken. * We saw an overall improvement in latency of about 50%, for cases with: - Large number of predicates, and - Large number of segments, and - Small number of matches per predicate per segment. --- ...istinctCountThetaSketchAggregationFunction.java | 46 +++++++++++++++------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java index 7f02368..99cef39 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java @@ -392,7 +392,12 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF Map<String, Sketch> result = new HashMap<>(); for (PredicateInfo predicateInfo : _predicateInfoMap.values()) { - result.put(predicateInfo.getStringPredicate(), unionMap.get(predicateInfo.getPredicate()).getResult()); + Sketch sketch = unionMap.get(predicateInfo.getPredicate()).getResult(); + + // Skip empty sketches, as they lead to unnecessary unions (and cost performance) + if (!sketch.isEmpty()) { + result.put(predicateInfo.getStringPredicate(), sketch); + } } return result; } @@ -406,7 +411,12 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF Map<String, Sketch> result = new HashMap<>(); for (PredicateInfo predicateInfo : _predicateInfoMap.values()) { - result.put(predicateInfo.getStringPredicate(), unionMap.get(predicateInfo.getPredicate()).getResult()); + Sketch sketch = unionMap.get(predicateInfo.getPredicate()).getResult(); + + // Skip empty sketches, as they lead to unnecessary unions (and cost performance) + if (!sketch.isEmpty()) { + result.put(predicateInfo.getStringPredicate(), sketch); + } } return result; } @@ -419,25 +429,33 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF return intermediateResult1; } - // NOTE: Here we parse the map keys to Predicate to handle the non-standard predicate string returned from server - // side for backward-compatibility. - // TODO: Remove the extra parsing after releasing 0.5.0 - Map<Predicate, Union> unionMap = getDefaultUnionMap(); + // Add sketches from intermediateResult1, merged with overlapping ones from intermediateResult2 + Map<String, Sketch> mergedResult = new HashMap<>(); for (Map.Entry<String, Sketch> entry : intermediateResult1.entrySet()) { - Predicate predicate = getPredicate(entry.getKey()); - unionMap.get(predicate).update(entry.getValue()); + String predicate = entry.getKey(); + Sketch sketch = intermediateResult2.get(predicate); + + // Merge the overlapping ones + if (sketch != null) { + Union union = getSetOperationBuilder().buildUnion(); + union.update(entry.getValue()); + union.update(sketch); + mergedResult.put(predicate, union.getResult()); + } else { // Collect the non-overlapping ones + mergedResult.put(predicate, entry.getValue()); + } } + + // Add sketches that are only in intermediateResult2 for (Map.Entry<String, Sketch> entry : intermediateResult2.entrySet()) { - Predicate predicate = getPredicate(entry.getKey()); - unionMap.get(predicate).update(entry.getValue()); - } - Map<String, Sketch> mergedResult = new HashMap<>(); - for (Map.Entry<Predicate, Union> entry : unionMap.entrySet()) { - mergedResult.put(entry.getKey().toString(), entry.getValue().getResult()); + // If key already present, it was already merged in the previous iteration. + mergedResult.putIfAbsent(entry.getKey(), entry.getValue()); } + return mergedResult; } + @Override public boolean isIntermediateResultComparable() { return false; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org