The results we get back from the server-side scan are already the partial aggregated values we need. GroupedAggregatingResultIterator will collapse adjacent Tuples together which happen to have the same row key. I'm not sure we want/need this to happen. Instead I think we just need to decode the aggregated values directly from the result of the scan.
On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <maryann....@gmail.com> wrote: > Hi James, > > bq. A few questions for you: not sure I understand the changes you made to > PhoenixRecordReader. Is it necessary to wrap the server-side scan results > in a GroupedAggregatingResultIterator? Each server-side scan will produce > results with a single tuple per group by key. In Phoenix, the > GroupedAggregatingResultIterator's function in life is to do the final > merge. Note too that the results aren't sorted that come back from the > aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted > by the group by key). Or is this just to help in decoding the values coming > back from the scan? > > It is necessary. I suppose what we should return as a partial result from > PhoenixRecordReader is exactly the same as what we do in standalone > Phoenix+Calcite, except that the result is partial or say incomplete. For > example, we have "select a, count(*) from t group by a", we should return > rows that have "a" as the first expression value, and "count(*)" as the > second expression value. For this "count" expression, it actually needs a > ClientAggregator for evaluation, and that's what this > GroupedAggregatingResultIterator is used for. > Since "each server-side scan will produce results with a single tuple per > group by key", and PhoenixRecordReader is only dealing with one server-side > result each time, we don't care how the group-by keys are arranged (ordered > or unordered"). Actually GroupedAggregatingResultIterator is not the > group-by iterator we use for AggregatePlan. It does not "combine". It > treats every row as a different group, by returning its rowkey as the > group-by key (GroupedAggregatingResultIterator.java:56). > > In short, this iterator is for decoding the server-side values. So we may > want to optimize this logic by removing this serialization and > deserialization and having only one set of aggregators in future. > > bq. Also, not sure what impact it has in the way we "combine" the scans in > our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as > each of our scans could include duplicate group by keys. Is it ok to > combine them in this case? > > It should not matter, or at least is not related to the problem I'm now > having. > > bq. One more question: how is the group by key communicated back to Drill? > > According to the HashAggPrule, if it decides to create a two-phase > aggregate, the first phase is now handled by Phoenix (after applying the > PhoenixHashAggPrule). I assume then the partial results gets shuffled based > on the hash of their group-by keys (returned by PhoenixRecordReader). The > final step is the Drill hash aggregation. > > > This is my test table "A.BEER", which has for columns: "B", "E1", "E2", > "R", all of INTEGER types. And the data is generated like this: > for (x=1 to N) { //currently N=1000 > UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x); > } > > The group-by query for testing is "SELECT e1, count(*) FROM a.beer GROUP > BY e1". > The expected result should be: > 0 100 > 1 100 > 2 100 > 3 100 > 4 100 > 5 100 > 6 100 > 7 100 > 8 100 > 9 100 > The actual result was: > 6 0 > 7 0 > 8 0 > 9 0 > 0 0 > 1 100 > 2 100 > 3 100 > 4 100 > 5 100 > > Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP BY > e2". > Similarly, the expected result should have group-by keys from 0 to 99, > each having a value of 10 as the count, while the actual result was: > from group-by key 86 to 99, together with 0, their count values were all > 0; the rest (1 to 85) all had the correct value 10. > > Looks to me that the scans were good but there was a problem with one of > the hash buckets. > > Thanks, > Maryann > > > On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <jamestay...@apache.org> > wrote: > >> Nice progress, Maryann. >> >> A few questions for you: not sure I understand the changes you made to >> PhoenixRecordReader. Is it necessary to wrap the server-side scan results >> in a GroupedAggregatingResultIterator? Each server-side scan will produce >> results with a single tuple per group by key. In Phoenix, the >> GroupedAggregatingResultIterator's function in life is to do the final >> merge. Note too that the results aren't sorted that come back from the >> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted >> by the group by key). Or is this just to help in decoding the values coming >> back from the scan? >> >> Also, not sure what impact it has in the way we "combine" the scans in >> our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as >> each of our scans could include duplicate group by keys. Is it ok to >> combine them in this case? >> >> One more question: how is the group by key communicated back to Drill? >> >> Thanks, >> James >> >> >> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <maryann....@gmail.com> >> wrote: >> >>> Added a few fixes in the pull request. Tested with two regions, turned >>> out that half of the result is empty (count = 0). >>> Not sure if there's anything wrong with >>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java >>> . >>> Like Julian said, this rule looks a bit hacky. >>> >>> To force a 2-phase HashAgg, I made a temporary change as well: >>> >>> diff --git >>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>> >>> index b911f6b..58bc918 100644 >>> >>> --- >>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>> >>> +++ >>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>> >>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule { >>> >>> // If any of the aggregate functions are not one of these, then we >>> >>> // currently won't generate a 2 phase plan. >>> >>> protected boolean create2PhasePlan(RelOptRuleCall call, >>> DrillAggregateRel aggregate) { >>> >>> - PlannerSettings settings = >>> PrelUtil.getPlannerSettings(call.getPlanner()); >>> >>> - RelNode child = call.rel(0).getInputs().get(0); >>> >>> - boolean smallInput = child.getRows() < settings.getSliceTarget(); >>> >>> - if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode() >>> || smallInput) { >>> >>> - return false; >>> >>> - } >>> >>> +// PlannerSettings settings = >>> PrelUtil.getPlannerSettings(call.getPlanner()); >>> >>> +// RelNode child = call.rel(0).getInputs().get(0); >>> >>> +// boolean smallInput = child.getRows() < settings.getSliceTarget(); >>> >>> +// if (! settings.isMultiPhaseAggEnabled() || >>> settings.isSingleMode() || smallInput) { >>> >>> +// return false; >>> >>> +// } >>> >>> >>> for (AggregateCall aggCall : aggregate.getAggCallList()) { >>> >>> String name = aggCall.getAggregation().getName(); >>> >>> >>> Thanks, >>> Maryann >>> >>> >>> >>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org> wrote: >>> >>>> Drill's current approach seems adequate for Drill alone but extending >>>> it to a heterogenous system that includes Phoenix seems like a hack. >>>> >>>> I think you should only create Prels for algebra nodes that you know >>>> for sure are going to run on the Drill engine. If there's a >>>> possibility that it would run in another engine such as Phoenix then >>>> they should still be logical. >>>> >>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <maryann....@gmail.com> >>>> wrote: >>>> > The partial aggregate seems to be working now, with one interface >>>> extension >>>> > and one bug fix in the Phoenix project. Will do some code cleanup and >>>> > create a pull request soon. >>>> > >>>> > Still there was a hack in the Drill project which I made to force >>>> 2-phase >>>> > aggregation. I'll try to fix that. >>>> > >>>> > Jacques, I have one question though, how can I verify that there are >>>> more >>>> > than one slice and the shuffle happens? >>>> > >>>> > >>>> > Thanks, >>>> > Maryann >>>> > >>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <jamestay...@apache.org> >>>> wrote: >>>> > >>>> >> Maryann, >>>> >> I believe Jacques mentioned that a little bit of refactoring is >>>> required >>>> >> for a merge sort to occur - there's something that does that, but >>>> it's not >>>> >> expected to be used in this context currently. >>>> >> >>>> >> IMHO, there's more of a clear value in getting the aggregation to use >>>> >> Phoenix first, so I'd recommend going down that road as Jacques >>>> mentioned >>>> >> above if possible. Once that's working, we can circle back to the >>>> partial >>>> >> sort. >>>> >> >>>> >> Thoughts? >>>> >> James >>>> >> >>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <maryann....@gmail.com> >>>> >> wrote: >>>> >> >>>> >>> I actually tried implementing partial sort with >>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured might >>>> be a >>>> >>> little easier to start with than partial aggregation. But I found >>>> that even >>>> >>> though the code worked (returned the right results), the Drill side >>>> sort >>>> >>> turned out to be a ordinary sort instead of a merge which it should >>>> have >>>> >>> been. Any idea of how to fix that? >>>> >>> >>>> >>> >>>> >>> Thanks, >>>> >>> Maryann >>>> >>> >>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <jacq...@dremio.com >>>> > >>>> >>> wrote: >>>> >>> >>>> >>>> Right now this type of work is done here: >>>> >>>> >>>> >>>> >>>> >>>> >>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java >>>> >>>> >>>> >>>> >>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>> >>>> >>>> >>>> With Distribution Trait application here: >>>> >>>> >>>> >>>> >>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java >>>> >>>> >>>> >>>> To me, the easiest way to solve the Phoenix issue is by providing >>>> a rule >>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix convention >>>> as >>>> >>>> input. It would replace everywhere but would only be plannable >>>> when it is >>>> >>>> the first phase of aggregation. >>>> >>>> >>>> >>>> Thoughts? >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> >>>> Jacques Nadeau >>>> >>>> CTO and Co-Founder, Dremio >>>> >>>> >>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org> >>>> wrote: >>>> >>>> >>>> >>>>> Phoenix is able to perform quite a few relational operations on >>>> the >>>> >>>>> region server: scan, filter, project, aggregate, sort (optionally >>>> with >>>> >>>>> limit). However, the sort and aggregate are necessarily "local". >>>> They >>>> >>>>> can only deal with data on that region server, and there needs to >>>> be a >>>> >>>>> further operation to combine the results from the region servers. >>>> >>>>> >>>> >>>>> The question is how to plan such queries. I think the answer is an >>>> >>>>> AggregateExchangeTransposeRule. >>>> >>>>> >>>> >>>>> The rule would spot an Aggregate on a data source that is split >>>> into >>>> >>>>> multiple locations (partitions) and split it into a partial >>>> Aggregate >>>> >>>>> that computes sub-totals and a summarizing Aggregate that combines >>>> >>>>> those totals. >>>> >>>>> >>>> >>>>> How does the planner know that the Aggregate needs to be split? >>>> Since >>>> >>>>> the data's distribution has changed, there would need to be an >>>> >>>>> Exchange operator. It is the Exchange operator that triggers the >>>> rule >>>> >>>>> to fire. >>>> >>>>> >>>> >>>>> There are some special cases. If the data is sorted as well as >>>> >>>>> partitioned (say because the local aggregate uses a sort-based >>>> >>>>> algorithm) we could maybe use a more efficient plan. And if the >>>> >>>>> partition key is the same as the aggregation key we don't need a >>>> >>>>> summarizing Aggregate, just a Union. >>>> >>>>> >>>> >>>>> It turns out not to be very Phoenix-specific. In the >>>> Drill-on-Phoenix >>>> >>>>> scenario, once the Aggregate has been pushed through the Exchange >>>> >>>>> (i.e. onto the drill-bit residing on the region server) we can >>>> then >>>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane and >>>> make >>>> >>>>> it into a PhoenixServerAggregate that executes in the region >>>> server. >>>> >>>>> >>>> >>>>> Related issues: >>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840 >>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751 >>>> >>>>> >>>> >>>>> Julian >>>> >>>>> >>>> >>>> >>>> >>>> >>>> >>> >>>> >> >>>> >>> >>> >> >