This is an automated email from the ASF dual-hosted git repository. fjy pushed a commit to branch 0.15.0-incubating in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.15.0-incubating by this push: new ea3de8c allow quantiles merge aggregator to also accept doubles (#7718) (#7743) ea3de8c is described below commit ea3de8cac5c82173d0563416679fef2e3e7beeb2 Author: Clint Wylie <cwy...@apache.org> AuthorDate: Thu May 23 17:04:08 2019 -0700 allow quantiles merge aggregator to also accept doubles (#7718) (#7743) * allow quantiles merge aggregator to also accept doubles * consolidate dupe * import --- .../quantiles/DoublesSketchBuildAggregator.java | 3 - .../quantiles/DoublesSketchMergeAggregator.java | 23 +++-- .../DoublesSketchMergeBufferAggregator.java | 11 +-- .../sql/DoublesSketchSqlAggregatorTest.java | 106 +++++++++++++++++---- 4 files changed, 106 insertions(+), 37 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java index bd46fc5..18f94a9 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java @@ -28,14 +28,12 @@ public class DoublesSketchBuildAggregator implements Aggregator { private final ColumnValueSelector<Double> valueSelector; - private final int size; private UpdateDoublesSketch sketch; public DoublesSketchBuildAggregator(final ColumnValueSelector<Double> valueSelector, final int size) { this.valueSelector = valueSelector; - this.size = size; sketch = DoublesSketch.builder().setK(size).build(); } @@ -68,5 +66,4 @@ public class DoublesSketchBuildAggregator implements Aggregator { sketch = null; } - } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java index 325a6f2..4598048 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java @@ -27,10 +27,10 @@ import org.apache.druid.segment.ColumnValueSelector; public class DoublesSketchMergeAggregator implements Aggregator { - private final ColumnValueSelector<DoublesSketch> selector; + private final ColumnValueSelector selector; private DoublesUnion union; - public DoublesSketchMergeAggregator(final ColumnValueSelector<DoublesSketch> selector, final int k) + public DoublesSketchMergeAggregator(final ColumnValueSelector selector, final int k) { this.selector = selector; union = DoublesUnion.builder().setMaxK(k).build(); @@ -39,13 +39,10 @@ public class DoublesSketchMergeAggregator implements Aggregator @Override public synchronized void aggregate() { - final DoublesSketch sketch = selector.getObject(); - if (sketch == null) { - return; - } - union.update(sketch); + updateUnion(selector, union); } + @Override public synchronized Object get() { @@ -70,4 +67,16 @@ public class DoublesSketchMergeAggregator implements Aggregator union = null; } + static void updateUnion(ColumnValueSelector selector, DoublesUnion union) + { + final Object object = selector.getObject(); + if (object == null) { + return; + } + if (object instanceof DoublesSketch) { + union.update((DoublesSketch) object); + } else { + union.update(selector.getDouble()); + } + } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java index ffe9009..f5a1e9d 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java @@ -20,7 +20,6 @@ package org.apache.druid.query.aggregation.datasketches.quantiles; import com.yahoo.memory.WritableMemory; -import com.yahoo.sketches.quantiles.DoublesSketch; import com.yahoo.sketches.quantiles.DoublesUnion; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; @@ -34,14 +33,14 @@ import java.util.IdentityHashMap; public class DoublesSketchMergeBufferAggregator implements BufferAggregator { - private final ColumnValueSelector<DoublesSketch> selector; + private final ColumnValueSelector selector; private final int k; private final int maxIntermediateSize; private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>(); private final IdentityHashMap<ByteBuffer, Int2ObjectMap<DoublesUnion>> unions = new IdentityHashMap<>(); public DoublesSketchMergeBufferAggregator( - final ColumnValueSelector<DoublesSketch> selector, + final ColumnValueSelector selector, final int k, final int maxIntermediateSize) { @@ -62,12 +61,8 @@ public class DoublesSketchMergeBufferAggregator implements BufferAggregator @Override public synchronized void aggregate(final ByteBuffer buffer, final int position) { - final DoublesSketch sketch = selector.getObject(); - if (sketch == null) { - return; - } final DoublesUnion union = unions.get(buffer).get(position); - union.update(sketch); + DoublesSketchMergeAggregator.updateUnion(selector, union); } @Override diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java index e51c2de..a3382f4 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java @@ -125,25 +125,26 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase CalciteTests.getJsonMapper().registerModule(mod); } - final QueryableIndex index = IndexBuilder.create() - .tmpDir(temporaryFolder.newFolder()) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema( - new IncrementalIndexSchema.Builder() - .withMetrics( - new CountAggregatorFactory("cnt"), - new DoubleSumAggregatorFactory("m1", "m1"), - new DoublesSketchAggregatorFactory( - "qsketch_m1", - "m1", - 128 - ) - ) - .withRollup(false) - .build() - ) - .rows(CalciteTests.ROWS1) - .buildMMappedIndex(); + final QueryableIndex index = + IndexBuilder.create() + .tmpDir(temporaryFolder.newFolder()) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withMetrics( + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1"), + new DoublesSketchAggregatorFactory( + "qsketch_m1", + "m1", + 128 + ) + ) + .withRollup(false) + .build() + ) + .rows(CalciteTests.ROWS1) + .buildMMappedIndex(); walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( DataSegment.builder() @@ -401,6 +402,73 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase ); } + @Test + public void testQuantileOnInnerQuantileQuery() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT dim1, APPROX_QUANTILE_DS(x, 0.5)\n" + + "FROM (SELECT dim1, dim2, APPROX_QUANTILE_DS(m1, 0.5) AS x FROM foo GROUP BY dim1, dim2) GROUP BY dim1"; + + + final List<Object[]> results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + + ImmutableList.Builder<Object[]> builder = ImmutableList.builder(); + builder.add(new Object[]{"", 1.0}); + builder.add(new Object[]{"1", 4.0}); + builder.add(new Object[]{"10.1", 2.0}); + builder.add(new Object[]{"2", 3.0}); + builder.add(new Object[]{"abc", 6.0}); + builder.add(new Object[]{"def", 5.0}); + final List<Object[]> expectedResults = builder.build(); + Assert.assertEquals(expectedResults.size(), results.size()); + for (int i = 0; i < expectedResults.size(); i++) { + Assert.assertArrayEquals(expectedResults.get(i), results.get(i)); + } + + // Verify query + Assert.assertEquals( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .setGranularity(Granularities.ALL) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") + ) + .setAggregatorSpecs( + ImmutableList.of( + new DoublesSketchAggregatorFactory("a0:agg", "m1", 128) + ) + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.5f) + ) + ) + .setContext(ImmutableMap.of(PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build() + ) + ) + .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING)) + .setAggregatorSpecs( + new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128) + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new DoublesSketchToQuantilePostAggregator("_a0", makeFieldAccessPostAgg("_a0:agg"), 0.5f) + ) + ) + .setContext(ImmutableMap.of(PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + private static PostAggregator makeFieldAccessPostAgg(String name) { return new FieldAccessPostAggregator(name, name); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org