[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-04-25 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r278700494
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageIterable.java
 ##
 @@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.movingaverage.averagers.Averager;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.NilColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * {@link MovingAverageIterable} iterates over days {@link RowBucket}, 
producing rows for each dimension combination,
+ * filling in missing entries with "empty" rows so that the averaging buckets 
have enough data to operate on.
+ * It then computes the moving average on the buckets and returns the row.
+ * See {@link MovingAverageIterator#computeMovingAverage(Map, Row, boolean)} 
for more details.
+ */
+public class MovingAverageIterable implements Iterable
+{
+
+  private final Sequence seq;
+  private final List dims;
+  private final List> factories;
+  private final Map postAggMap;
+  private final Map aggMap;
+  private final Map fakeEvents;
+
+  public MovingAverageIterable(
+  Sequence buckets,
+  List dims,
+  List> factories,
+  List postAggList,
+  List aggList
+  )
+  {
+this.dims = dims;
+this.factories = factories;
+this.seq = buckets;
+
+postAggMap = postAggList.stream().collect(Collectors.toMap(postAgg -> 
postAgg.getName(), postAgg -> postAgg));
+aggMap = aggList.stream().collect(Collectors.toMap(agg -> agg.getName(), 
agg -> agg));
+fakeEvents = generateFakeEventsFromAggregators(aggMap, postAggMap);
+  }
+
+  // Build a list of dummy events from Aggregators/PostAggregators to be used 
by Iterator to build fake rows.
+  // These fake rows will be used by computeMovingAverage() in skip=true mode.
+  // See fakeEventsCopy in internalNext() and computeMovingAverage() 
documentation.
+  private Map generateFakeEventsFromAggregators(Map aggMap,
+Map postAggMap)
+  {
+Map fakeEvents = new LinkedHashMap<>();
+aggMap.values().forEach(agg -> {
+  Aggregator aggFactorized = 
agg.factorize(getEmptyColumnSelectorFactory());
+  fakeEvents.put(agg.getName(), aggFactorized.get());
+});
+postAggMap.values().forEach(postAgg -> fakeEvents.put(postAgg.getName(), 
postAgg.compute(fakeEvents)));
+return fakeEvents;
+  }
+
+  @Nonnull
+  private ColumnSelectorFactory getEmptyColumnSelectorFactory()
+  {
+return new ColumnSelectorFactory()
+{
+  @Override
+  public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+  {
+// Generating empty records while aggregating on Filtered aggregators 
requires a dimension selector
+// for initialization.  This dimension selector is not actually used 
for generating values
+return DimensionSelector.constant(null);
+  }
+
+  

[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-04-19 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r277082193
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/test/resources/runtime.properties
 ##
 @@ -0,0 +1,2 @@
+druid.processing.buffer.sizeBytes=655360
 
 Review comment:
   I added the property (`druid.processing.buffer.sizeBytes=655360`) to 
`MovingAverageQueryTest` and removed runtime.properties.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-04-19 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r277081869
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageIterable.java
 ##
 @@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.movingaverage.averagers.Averager;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.NilColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * {@link MovingAverageIterable} iterates over days {@link RowBucket}, 
producing rows for each dimension combination,
+ * filling in missing entries with "empty" rows so that the averaging buckets 
have enough data to operate on.
+ * It then computes the moving average on the buckets and returns the row.
+ * See {@link MovingAverageIterator#computeMovingAverage(Map, Row, boolean)} 
for more details.
+ */
+public class MovingAverageIterable implements Iterable
+{
+
+  private final Sequence seq;
+  private final List dims;
+  private final List> factories;
+  private final Map postAggMap;
+  private final Map aggMap;
+  private final Map fakeEvents;
 
 Review comment:
   Renamed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-19 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r267088298
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucketIterable.java
 ##
 @@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * It is the iterable used to bucket data into days,
+ * doing appropriate lookahead to see if the next row is in the same day or a 
new day.
+ */
+public class RowBucketIterable implements Iterable
+{
+
+  public final Sequence seq;
+  private List intervals;
+  private Period period;
+
+  public RowBucketIterable(Sequence seq, List intervals, Period 
period)
+  {
+this.seq = seq;
+this.period = period;
+this.intervals = intervals;
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Iterable#iterator()
+   */
+  @Override
+  public Iterator iterator()
+  {
+return new RowBucketIterator(seq, intervals, period);
+  }
+
+  static class RowBucketIterator implements Iterator
+  {
+private Yielder yielder;
+private DateTime endTime;
+private DateTime expectedBucket;
+private Period period;
+private int intervalIndex = 0;
+private List intervals;
+private boolean processedLastRow = false;
+private boolean processedExtraRow = false;
+
+public RowBucketIterator(Sequence rows, List intervals, 
Period period)
+{
+  this.period = period;
+  this.intervals = intervals;
+  expectedBucket = intervals.get(intervalIndex).getStart();
+  endTime = intervals.get(intervals.size() - 1).getEnd();
+  yielder = rows.toYielder(null, new BucketingAccumulator());
+}
+
+/* (non-Javadoc)
+ * @see java.util.Iterator#hasNext()
+ */
+@Override
+public boolean hasNext()
+{
+  return expectedBucket.compareTo(endTime) < 0;
+}
+
+/* (non-Javadoc)
+ * @see java.util.Iterator#next()
+ */
+@Override
+public RowBucket next()
+{
+  RowBucket currentBucket = yielder.get();
+
+  if (expectedBucket.compareTo(intervals.get(intervalIndex).getEnd()) >= 
0) {
+intervalIndex++;
+if (intervalIndex < intervals.size()) {
+  expectedBucket = intervals.get(intervalIndex).getStart();
+}
+  }
+  // currentBucket > expectedBucket
+  if (currentBucket != null && 
currentBucket.getDateTime().compareTo(expectedBucket) > 0) {
+currentBucket = new RowBucket(expectedBucket, Collections.emptyList());
+expectedBucket = expectedBucket.plus(period);
+return currentBucket;
+  }
+
+  if (!yielder.isDone()) {
+// standard case. return regular row
+yielder = yielder.next(currentBucket);
+expectedBucket = expectedBucket.plus(period);
+return currentBucket;
 
 Review comment:
   `hasNext()` is correct since interval upper bound is open-ended.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-19 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266774086
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageIterable.java
 ##
 @@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.movingaverage.averagers.Averager;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * {@link MovingAverageIterable} iterates over days {@link RowBucket}, 
producing rows for each dimension combination,
+ * filling in missing entries with "empty" rows so that the averaging buckets 
have enough data to operate on.
+ * It then computes the moving average on the buckets and returns the row.
+ * See {@link MovingAverageIterator#computeMovingAverage(Map, Row, boolean)} 
for more details.
+ */
+public class MovingAverageIterable implements Iterable
+{
+
+  private final Sequence seq;
+  private final List dims;
+  private final List> factories;
+  private final Map postAggMap;
+  private final Map aggMap;
+  private final Map fakeEvents;
+
+  public MovingAverageIterable(
+  Sequence buckets,
+  List dims,
+  List> factories,
+  List postAggList,
+  List aggList
+  )
+  {
+this.dims = dims;
+this.factories = factories;
+this.seq = buckets;
+
+postAggMap = postAggList.stream().collect(Collectors.toMap(postAgg -> 
postAgg.getName(), postAgg -> postAgg));
+aggMap = aggList.stream().collect(Collectors.toMap(agg -> agg.getName(), 
agg -> agg));
+
+ColumnSelectorFactory colFact = new ColumnSelectorFactory()
+{
+  @Override
+  public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+  {
+// Generating empty records while aggregating on Filtered aggregators 
requires a dimension selector
+// for initialization.  This dimension selector is not actually used 
for generating values
+return DimensionSelector.constant(null);
+  }
+
+  @Override
+  public ColumnValueSelector makeColumnValueSelector(String s)
+  {
+return null;
+  }
+
+  @Override
+  public ColumnCapabilities getColumnCapabilities(String s)
+  {
+return null;
+  }
+};
+// Fill in all the fake events
+fakeEvents = new LinkedHashMap<>();
+aggMap.values().forEach(agg -> {
+  Aggregator aggFactorized = agg.factorize(colFact);
+  fakeEvents.put(agg.getName(), aggFactorized.get());
+});
+postAggMap.values().forEach(postAgg -> fakeEvents.put(postAgg.getName(), 
postAgg.compute(fakeEvents)));
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Iterable#iterator()
+   */
+  @Override
+  public Iterator iterator()
+  {
+return new MovingAverageIterator(seq, dims, factories, fakeEvents, aggMap);
+  }
+
+  static class MovingAverageIterator implements Iterator
+  {
+
+private final List dims;
+private final Map, List>> averagers = new 
HashMap<>();
+private final List> factories;
+
+private Yielder yielder;
+private RowBucket cache = null;
+

[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266701884
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageIterableTest.java
 ##
 @@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.query.movingaverage.averagers.ConstantAveragerFactory;
+import org.apache.druid.query.movingaverage.averagers.LongMeanAveragerFactory;
+import org.joda.time.DateTime;
+import org.joda.time.chrono.ISOChronology;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ */
+public class MovingAverageIterableTest
+{
+  private static final DateTime JAN_1 = new DateTime(2017, 1, 1, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_2 = new DateTime(2017, 1, 2, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_3 = new DateTime(2017, 1, 3, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_4 = new DateTime(2017, 1, 4, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_5 = new DateTime(2017, 1, 5, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_6 = new DateTime(2017, 1, 6, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_7 = new DateTime(2017, 1, 7, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+
+  private static final String GENDER = "gender";
+  private static final String AGE = "age";
+  private static final String COUNTRY = "country";
+
+  private static final Map dims1 = new HashMap<>();
+  private static final Map dims2 = new HashMap<>();
+  private static final Map dims3 = new HashMap<>();
+
+  static {
+dims1.put(GENDER, "m");
+dims1.put(AGE, "10");
+dims1.put(COUNTRY, "US");
+
+dims2.put(GENDER, "f");
+dims2.put(AGE, "8");
+dims2.put(COUNTRY, "US");
+
+dims3.put(GENDER, "u");
+dims3.put(AGE, "5");
+dims3.put(COUNTRY, "UK");
+  }
+
+  @Test
+  public void testNext()
+  {
+
+Collection dims = Arrays.asList(
+new DefaultDimensionSpec(GENDER, GENDER),
+new DefaultDimensionSpec(AGE, AGE),
+new DefaultDimensionSpec(COUNTRY, COUNTRY)
+);
+
+Sequence dayBuckets = Sequences.simple(Arrays.asList(
+new RowBucket(JAN_1, Arrays.asList(
+new MapBasedRow(JAN_1, dims1),
+new MapBasedRow(JAN_1, dims2)
+)),
+new RowBucket(JAN_2, Collections.singletonList(
+new MapBasedRow(JAN_2, dims1)
+)),
+new RowBucket(JAN_3, Collections.emptyList()),
+new RowBucket(JAN_4, Arrays.asList(
+new MapBasedRow(JAN_4, dims2),
+new MapBasedRow(JAN_4, dims3)
+))
+));
+

[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266701830
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageIterableTest.java
 ##
 @@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.query.movingaverage.averagers.ConstantAveragerFactory;
+import org.apache.druid.query.movingaverage.averagers.LongMeanAveragerFactory;
+import org.joda.time.DateTime;
+import org.joda.time.chrono.ISOChronology;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ */
+public class MovingAverageIterableTest
+{
+  private static final DateTime JAN_1 = new DateTime(2017, 1, 1, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_2 = new DateTime(2017, 1, 2, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_3 = new DateTime(2017, 1, 3, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_4 = new DateTime(2017, 1, 4, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_5 = new DateTime(2017, 1, 5, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_6 = new DateTime(2017, 1, 6, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_7 = new DateTime(2017, 1, 7, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+
+  private static final String GENDER = "gender";
+  private static final String AGE = "age";
+  private static final String COUNTRY = "country";
+
+  private static final Map dims1 = new HashMap<>();
+  private static final Map dims2 = new HashMap<>();
+  private static final Map dims3 = new HashMap<>();
+
+  static {
+dims1.put(GENDER, "m");
+dims1.put(AGE, "10");
+dims1.put(COUNTRY, "US");
+
+dims2.put(GENDER, "f");
+dims2.put(AGE, "8");
+dims2.put(COUNTRY, "US");
+
+dims3.put(GENDER, "u");
+dims3.put(AGE, "5");
+dims3.put(COUNTRY, "UK");
+  }
+
+  @Test
+  public void testNext()
+  {
+
+Collection dims = Arrays.asList(
+new DefaultDimensionSpec(GENDER, GENDER),
+new DefaultDimensionSpec(AGE, AGE),
+new DefaultDimensionSpec(COUNTRY, COUNTRY)
+);
+
+Sequence dayBuckets = Sequences.simple(Arrays.asList(
+new RowBucket(JAN_1, Arrays.asList(
+new MapBasedRow(JAN_1, dims1),
+new MapBasedRow(JAN_1, dims2)
+)),
+new RowBucket(JAN_2, Collections.singletonList(
+new MapBasedRow(JAN_2, dims1)
+)),
+new RowBucket(JAN_3, Collections.emptyList()),
+new RowBucket(JAN_4, Arrays.asList(
+new MapBasedRow(JAN_4, dims2),
+new MapBasedRow(JAN_4, dims3)
+))
+));
+

[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266695618
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageIterable.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.movingaverage.averagers.Averager;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * {@link MovingAverageIterable} iterates over days {@link RowBucket}, 
producing rows for each dimension combination,
+ * filling in missing entries with "empty" rows so that the averaging buckets 
have enough data to operate on.
+ * It then computes the moving average on the buckets and returns the row.
+ * See {@link MovingAverageIterator#computeMovingAverage(Map, Row, boolean)} 
for more details.
+ */
+public class MovingAverageIterable implements Iterable
+{
+
+  private final Sequence seq;
+  private final Collection dims;
+  private final Collection> factories;
+  private final Map postAggMap;
+  private final Map aggMap;
+  private final Map fakeEvents;
+
+  public MovingAverageIterable(
+  Sequence buckets,
+  Collection dims,
+  Collection> factories,
+  List postAggList,
+  List aggList
+  )
+  {
+this.dims = dims;
+this.factories = factories;
+this.seq = buckets;
+
+postAggMap = postAggList.stream().collect(Collectors.toMap(postAgg -> 
postAgg.getName(), postAgg -> postAgg));
+aggMap = aggList.stream().collect(Collectors.toMap(agg -> agg.getName(), 
agg -> agg));
+
+ColumnSelectorFactory colFact = new ColumnSelectorFactory()
+{
+  @Override
+  public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+  {
+// Generating empty records while aggregating on Filtered aggregators 
requires a dimension selector
+// for initialization.  This dimension selector is not actually used 
for generating values
+return DimensionSelector.constant(null);
+  }
+
+  @Override
+  public ColumnValueSelector makeColumnValueSelector(String s)
+  {
+return null;
+  }
+
+  @Override
+  public ColumnCapabilities getColumnCapabilities(String s)
+  {
+return null;
+  }
+};
+// Fill in all the fake events
+fakeEvents = new LinkedHashMap<>();
+aggMap.values().forEach(agg -> {
+  Aggregator aggFactorized = agg.factorize(colFact);
+  fakeEvents.put(agg.getName(), aggFactorized.get());
+});
+postAggMap.values().forEach(postAgg -> fakeEvents.put(postAgg.getName(), 
postAgg.compute(fakeEvents)));
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Iterable#iterator()
+   */
+  @Override
+  public Iterator iterator()
+  {
+return new MovingAverageIterator(seq, dims, factories, fakeEvents, aggMap);
+  }
+
+  static class MovingAverageIterator implements Iterator
+  {
+
+private final Collection dims;
+private final Map, Collection>> averagers 
= new HashMap<>();
+private final Collection> factories;
+
+private Yielder yielder;
+

[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266670736
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAverager.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class LongMaxAverager extends BaseAverager
+{
+
+  private int startFrom = 0;
 
 Review comment:
   Added a comment (see commit `e2a5317`).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266671016
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageIterable.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.movingaverage.averagers.Averager;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * {@link MovingAverageIterable} iterates over days {@link RowBucket}, 
producing rows for each dimension combination,
+ * filling in missing entries with "empty" rows so that the averaging buckets 
have enough data to operate on.
+ * It then computes the moving average on the buckets and returns the row.
+ * See {@link MovingAverageIterator#computeMovingAverage(Map, Row, boolean)} 
for more details.
+ */
+public class MovingAverageIterable implements Iterable
+{
+
+  private final Sequence seq;
+  private final Collection dims;
+  private final Collection> factories;
+  private final Map postAggMap;
+  private final Map aggMap;
+  private final Map fakeEvents;
+
+  public MovingAverageIterable(
+  Sequence buckets,
+  Collection dims,
+  Collection> factories,
+  List postAggList,
+  List aggList
+  )
+  {
+this.dims = dims;
+this.factories = factories;
+this.seq = buckets;
+
+postAggMap = postAggList.stream().collect(Collectors.toMap(postAgg -> 
postAgg.getName(), postAgg -> postAgg));
+aggMap = aggList.stream().collect(Collectors.toMap(agg -> agg.getName(), 
agg -> agg));
+
+ColumnSelectorFactory colFact = new ColumnSelectorFactory()
+{
+  @Override
+  public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+  {
+// Generating empty records while aggregating on Filtered aggregators 
requires a dimension selector
+// for initialization.  This dimension selector is not actually used 
for generating values
+return DimensionSelector.constant(null);
+  }
+
+  @Override
+  public ColumnValueSelector makeColumnValueSelector(String s)
+  {
+return null;
+  }
+
+  @Override
+  public ColumnCapabilities getColumnCapabilities(String s)
+  {
+return null;
+  }
+};
+// Fill in all the fake events
+fakeEvents = new LinkedHashMap<>();
+aggMap.values().forEach(agg -> {
+  Aggregator aggFactorized = agg.factorize(colFact);
+  fakeEvents.put(agg.getName(), aggFactorized.get());
+});
+postAggMap.values().forEach(postAgg -> fakeEvents.put(postAgg.getName(), 
postAgg.compute(fakeEvents)));
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Iterable#iterator()
+   */
+  @Override
+  public Iterator iterator()
+  {
+return new MovingAverageIterator(seq, dims, factories, fakeEvents, aggMap);
+  }
+
+  static class MovingAverageIterator implements Iterator
+  {
+
+private final Collection dims;
+private final Map, Collection>> averagers 
= new HashMap<>();
+private final Collection> factories;
+
+private Yielder yielder;
+

[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266670736
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAverager.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class LongMaxAverager extends BaseAverager
+{
+
+  private int startFrom = 0;
 
 Review comment:
   Added a comment (see commit `e2a531703`).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266670359
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageIterable.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.movingaverage.averagers.Averager;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * {@link MovingAverageIterable} iterates over days {@link RowBucket}, 
producing rows for each dimension combination,
+ * filling in missing entries with "empty" rows so that the averaging buckets 
have enough data to operate on.
+ * It then computes the moving average on the buckets and returns the row.
+ * See {@link MovingAverageIterator#computeMovingAverage(Map, Row, boolean)} 
for more details.
+ */
+public class MovingAverageIterable implements Iterable
+{
+
+  private final Sequence seq;
+  private final Collection dims;
+  private final Collection> factories;
+  private final Map postAggMap;
+  private final Map aggMap;
+  private final Map fakeEvents;
+
+  public MovingAverageIterable(
+  Sequence buckets,
+  Collection dims,
+  Collection> factories,
+  List postAggList,
+  List aggList
+  )
+  {
+this.dims = dims;
+this.factories = factories;
+this.seq = buckets;
+
+postAggMap = postAggList.stream().collect(Collectors.toMap(postAgg -> 
postAgg.getName(), postAgg -> postAgg));
+aggMap = aggList.stream().collect(Collectors.toMap(agg -> agg.getName(), 
agg -> agg));
+
+ColumnSelectorFactory colFact = new ColumnSelectorFactory()
+{
+  @Override
+  public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+  {
+// Generating empty records while aggregating on Filtered aggregators 
requires a dimension selector
+// for initialization.  This dimension selector is not actually used 
for generating values
+return DimensionSelector.constant(null);
+  }
+
+  @Override
+  public ColumnValueSelector makeColumnValueSelector(String s)
+  {
+return null;
+  }
+
+  @Override
+  public ColumnCapabilities getColumnCapabilities(String s)
+  {
+return null;
+  }
+};
+// Fill in all the fake events
+fakeEvents = new LinkedHashMap<>();
+aggMap.values().forEach(agg -> {
+  Aggregator aggFactorized = agg.factorize(colFact);
+  fakeEvents.put(agg.getName(), aggFactorized.get());
+});
+postAggMap.values().forEach(postAgg -> fakeEvents.put(postAgg.getName(), 
postAgg.compute(fakeEvents)));
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Iterable#iterator()
+   */
+  @Override
+  public Iterator iterator()
+  {
+return new MovingAverageIterator(seq, dims, factories, fakeEvents, aggMap);
+  }
+
+  static class MovingAverageIterator implements Iterator
+  {
+
+private final Collection dims;
+private final Map, Collection>> averagers 
= new HashMap<>();
 
 Review comment:
   Done.


[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266686171
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucketIterable.java
 ##
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * It is the iterable used to bucket data into days,
+ * doing appropriate lookahead to see if the next row is in the same day or a 
new day.
 
 Review comment:
   Added comments, let me know if it is unclear.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266686171
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucketIterable.java
 ##
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * It is the iterable used to bucket data into days,
+ * doing appropriate lookahead to see if the next row is in the same day or a 
new day.
 
 Review comment:
   Added comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266674895
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/BaseAveragerFactory.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Common base class for AveragerFactories
+ *
+ * @param  Base type that the averager should return as a result
+ * @param  Type that that is returned from finalization
+ */
+public abstract class BaseAveragerFactory implements AveragerFactory
+{
+
+  protected String name;
+  protected String fieldName;
+  protected int numBuckets;
+  protected int cycleSize;
+
+  /**
+   * Constructor.
+   *
+   * @param name   Name of the Averager
+   * @param numBuckets Number of buckets in the analysis window
+   * @param fieldName  Field from incoming events to include in the analysis
+   * @param cycleSize  Cycle group size. Used to calculate day-of-week option. 
Default=1 (single element in group).
+   */
+  public BaseAveragerFactory(String name, int numBuckets, String fieldName, 
Integer cycleSize)
+  {
+this.name = name;
+this.numBuckets = numBuckets;
+this.fieldName = fieldName;
+this.cycleSize = (cycleSize != null) ? cycleSize : DEFAULT_PERIOD;
+Preconditions.checkNotNull(name, "Must have a valid, non-null averager 
name");
+Preconditions.checkNotNull(fieldName, "Must have a valid, non-null field 
name");
+Preconditions.checkArgument(this.cycleSize > 0, "Cycle size must be 
greater than zero");
+Preconditions.checkArgument(numBuckets > 0, "Bucket size must be greater 
than zero");
+Preconditions.checkArgument(this.cycleSize < numBuckets, "Cycle size must 
be less than the bucket size");
 
 Review comment:
   Fixed. (new pr)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266674831
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/test/resources/runtime.properties
 ##
 @@ -0,0 +1,58 @@
+druid.broker.cache.unCacheable=["groupBy","segmentMetadata"]
 
 Review comment:
   I do need this file for tests to succeed, I removed all unrequired 
properties.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266674372
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanAverager.java
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class LongMeanAverager extends BaseAverager
 
 Review comment:
   This is used for polymorphism by `BaseAvereger.addElement`. In fact, if I 
changed the type to Long, I get `java.lang.ArrayStoreException: 
java.lang.Integer` from `buckets[index++] = finalMetric;` in `addElement`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266671149
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageIterableTest.java
 ##
 @@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.query.movingaverage.averagers.ConstantAveragerFactory;
+import org.apache.druid.query.movingaverage.averagers.LongMeanAveragerFactory;
+import org.joda.time.DateTime;
+import org.joda.time.chrono.ISOChronology;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ */
+public class MovingAverageIterableTest
+{
+  private static final DateTime JAN_1 = new DateTime(2017, 1, 1, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_2 = new DateTime(2017, 1, 2, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_3 = new DateTime(2017, 1, 3, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_4 = new DateTime(2017, 1, 4, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_5 = new DateTime(2017, 1, 5, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_6 = new DateTime(2017, 1, 6, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+  private static final DateTime JAN_7 = new DateTime(2017, 1, 7, 0, 0, 0, 0, 
ISOChronology.getInstanceUTC());
+
+  private static final String GENDER = "gender";
+  private static final String AGE = "age";
+  private static final String COUNTRY = "country";
+
+  private static final Map dims1 = new HashMap<>();
+  private static final Map dims2 = new HashMap<>();
+  private static final Map dims3 = new HashMap<>();
+
+  static {
+dims1.put(GENDER, "m");
+dims1.put(AGE, "10");
+dims1.put(COUNTRY, "US");
+
+dims2.put(GENDER, "f");
+dims2.put(AGE, "8");
+dims2.put(COUNTRY, "US");
+
+dims3.put(GENDER, "u");
+dims3.put(AGE, "5");
+dims3.put(COUNTRY, "UK");
+  }
+
+  @Test
+  public void testNext()
+  {
+
+Collection dims = Arrays.asList(
+new DefaultDimensionSpec(GENDER, GENDER),
+new DefaultDimensionSpec(AGE, AGE),
+new DefaultDimensionSpec(COUNTRY, COUNTRY)
+);
+
+Sequence dayBuckets = Sequences.simple(Arrays.asList(
+new RowBucket(JAN_1, Arrays.asList(
+new MapBasedRow(JAN_1, dims1),
+new MapBasedRow(JAN_1, dims2)
+)),
+new RowBucket(JAN_2, Collections.singletonList(
+new MapBasedRow(JAN_2, dims1)
+)),
+new RowBucket(JAN_3, Collections.emptyList()),
+new RowBucket(JAN_4, Arrays.asList(
+new MapBasedRow(JAN_4, dims2),
+new MapBasedRow(JAN_4, dims3)
+))
+));
+

[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266671016
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageIterable.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.movingaverage.averagers.Averager;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * {@link MovingAverageIterable} iterates over days {@link RowBucket}, 
producing rows for each dimension combination,
+ * filling in missing entries with "empty" rows so that the averaging buckets 
have enough data to operate on.
+ * It then computes the moving average on the buckets and returns the row.
+ * See {@link MovingAverageIterator#computeMovingAverage(Map, Row, boolean)} 
for more details.
+ */
+public class MovingAverageIterable implements Iterable
+{
+
+  private final Sequence seq;
+  private final Collection dims;
+  private final Collection> factories;
+  private final Map postAggMap;
+  private final Map aggMap;
+  private final Map fakeEvents;
+
+  public MovingAverageIterable(
+  Sequence buckets,
+  Collection dims,
+  Collection> factories,
+  List postAggList,
+  List aggList
+  )
+  {
+this.dims = dims;
+this.factories = factories;
+this.seq = buckets;
+
+postAggMap = postAggList.stream().collect(Collectors.toMap(postAgg -> 
postAgg.getName(), postAgg -> postAgg));
+aggMap = aggList.stream().collect(Collectors.toMap(agg -> agg.getName(), 
agg -> agg));
+
+ColumnSelectorFactory colFact = new ColumnSelectorFactory()
+{
+  @Override
+  public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+  {
+// Generating empty records while aggregating on Filtered aggregators 
requires a dimension selector
+// for initialization.  This dimension selector is not actually used 
for generating values
+return DimensionSelector.constant(null);
+  }
+
+  @Override
+  public ColumnValueSelector makeColumnValueSelector(String s)
+  {
+return null;
+  }
+
+  @Override
+  public ColumnCapabilities getColumnCapabilities(String s)
+  {
+return null;
+  }
+};
+// Fill in all the fake events
+fakeEvents = new LinkedHashMap<>();
+aggMap.values().forEach(agg -> {
+  Aggregator aggFactorized = agg.factorize(colFact);
+  fakeEvents.put(agg.getName(), aggFactorized.get());
+});
+postAggMap.values().forEach(postAgg -> fakeEvents.put(postAgg.getName(), 
postAgg.compute(fakeEvents)));
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Iterable#iterator()
+   */
+  @Override
+  public Iterator iterator()
+  {
+return new MovingAverageIterator(seq, dims, factories, fakeEvents, aggMap);
+  }
+
+  static class MovingAverageIterator implements Iterator
+  {
+
+private final Collection dims;
+private final Map, Collection>> averagers 
= new HashMap<>();
+private final Collection> factories;
+
+private Yielder yielder;
+

[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266670736
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAverager.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class LongMaxAverager extends BaseAverager
+{
+
+  private int startFrom = 0;
 
 Review comment:
   Added a comment (in new pr, see commit `03035d1`).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] yurmix commented on a change in pull request #6430: Contributing Moving-Average Query to open source.

2019-03-18 Thread GitBox
yurmix commented on a change in pull request #6430: Contributing Moving-Average 
Query to open source.
URL: https://github.com/apache/incubator-druid/pull/6430#discussion_r266670359
 
 

 ##
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageIterable.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.movingaverage.averagers.Averager;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * {@link MovingAverageIterable} iterates over days {@link RowBucket}, 
producing rows for each dimension combination,
+ * filling in missing entries with "empty" rows so that the averaging buckets 
have enough data to operate on.
+ * It then computes the moving average on the buckets and returns the row.
+ * See {@link MovingAverageIterator#computeMovingAverage(Map, Row, boolean)} 
for more details.
+ */
+public class MovingAverageIterable implements Iterable
+{
+
+  private final Sequence seq;
+  private final Collection dims;
+  private final Collection> factories;
+  private final Map postAggMap;
+  private final Map aggMap;
+  private final Map fakeEvents;
+
+  public MovingAverageIterable(
+  Sequence buckets,
+  Collection dims,
+  Collection> factories,
+  List postAggList,
+  List aggList
+  )
+  {
+this.dims = dims;
+this.factories = factories;
+this.seq = buckets;
+
+postAggMap = postAggList.stream().collect(Collectors.toMap(postAgg -> 
postAgg.getName(), postAgg -> postAgg));
+aggMap = aggList.stream().collect(Collectors.toMap(agg -> agg.getName(), 
agg -> agg));
+
+ColumnSelectorFactory colFact = new ColumnSelectorFactory()
+{
+  @Override
+  public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+  {
+// Generating empty records while aggregating on Filtered aggregators 
requires a dimension selector
+// for initialization.  This dimension selector is not actually used 
for generating values
+return DimensionSelector.constant(null);
+  }
+
+  @Override
+  public ColumnValueSelector makeColumnValueSelector(String s)
+  {
+return null;
+  }
+
+  @Override
+  public ColumnCapabilities getColumnCapabilities(String s)
+  {
+return null;
+  }
+};
+// Fill in all the fake events
+fakeEvents = new LinkedHashMap<>();
+aggMap.values().forEach(agg -> {
+  Aggregator aggFactorized = agg.factorize(colFact);
+  fakeEvents.put(agg.getName(), aggFactorized.get());
+});
+postAggMap.values().forEach(postAgg -> fakeEvents.put(postAgg.getName(), 
postAgg.compute(fakeEvents)));
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Iterable#iterator()
+   */
+  @Override
+  public Iterator iterator()
+  {
+return new MovingAverageIterator(seq, dims, factories, fakeEvents, aggMap);
+  }
+
+  static class MovingAverageIterator implements Iterator
+  {
+
+private final Collection dims;
+private final Map, Collection>> averagers 
= new HashMap<>();
 
 Review comment:
   Done (committed in new pr).