Made another two check-ins to https://github.com/jacques-n/drill/pull/5, first one for the changes James had suggested. The second check-in included some test cases that failed to use Phoenix partial aggregate because of https://issues.apache.org/jira/browse/CALCITE-926.
I also reproduced the problem with Phoenix+Calcite, but will make a new patch for CALCITE-926 to add some standalone test cases for Calcite. Thanks, Maryann On Fri, Oct 9, 2015 at 1:30 PM, James Taylor <[email protected]> wrote: > Thanks for the updates to the patch, Maryann. It's looking very good - > this will perform better I believe. I made a few comments on the pull > request. > > FYI, I filed PHOENIX-2316 to add the missing information (namely the > region server that the parallelized scan will go to) so that I can improve > the assignment logic. > > James > > On Wed, Oct 7, 2015 at 1:11 PM, Maryann Xue <[email protected]> wrote: > >> Made another checkin for the pull request. All good now. >> >> In order to compile and run, be sure to update the Phoenix project under >> Julian's branch. >> >> >> Thanks, >> Maryann >> >> On Wed, Oct 7, 2015 at 12:19 PM, Jacques Nadeau <[email protected]> >> wrote: >> >>> I just filed a jira for the merge issue: >>> >>> https://issues.apache.org/jira/browse/DRILL-3907 >>> >>> -- >>> Jacques Nadeau >>> CTO and Co-Founder, Dremio >>> >>> On Wed, Oct 7, 2015 at 8:54 AM, Jacques Nadeau <[email protected]> >>> wrote: >>> >>>> Drill doesn't currently have a merge-sort operation available outside >>>> the context of an exchange. See here: >>>> >>>> >>>> https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver >>>> >>>> We'll need to do a bit of refactoring to provide this functionality >>>> outside the context of an exchange. The one other thing we'll have to think >>>> about in this context is how do we avoid doing a n-way merge in the case >>>> that the we're not using the collation. >>>> >>>> -- >>>> Jacques Nadeau >>>> CTO and Co-Founder, Dremio >>>> >>>> On Wed, Oct 7, 2015 at 8:18 AM, Maryann Xue <[email protected]> >>>> wrote: >>>> >>>>> One thing from what I asked James offline yesterday, and maybe we can >>>>> discuss a little bit in today's meeting: >>>>> >>>>> Phoenix uses a list of lists of Scan objects to indicate Region >>>>> boundaries and guideposts, and if the top-level list contains more than >>>>> one >>>>> element it means that the results from different Scanner/ResultIterator >>>>> should be merge-sorted. We now use this list in Drill integration to >>>>> generate different batches or slices. I see from the Drill plan of a >>>>> simple >>>>> select like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of >>>>> the PhoenixTableScan. I guess this is a real sort rather than a >>>>> merge-sort. >>>>> So optimally, >>>>> 1) this should be a merge-sort (to be more accurate, a merge) >>>>> 2) furthermore, if Drill has something to indicate the order among >>>>> slices and batches, we could even turn it into a concat. >>>>> >>>>> The structure of this Scan list might be helpful for 2), or we may >>>>> have some Logical representation for this. Otherwise, we can simply >>>>> flatten >>>>> this list to a one-dimensional list as we do now (in my ci yesterday). >>>>> >>>>> >>>>> >>>>> Thanks, >>>>> Maryann >>>>> >>>>> On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <[email protected]> >>>>> wrote: >>>>> >>>>>> Yes, but the partially aggregated results will not contain any >>>>>> duplicate rowkeys, since they are also group-by keys. What we need is the >>>>>> aggregators and call aggregate for each row. We can write a new simpler >>>>>> ResultIterator to replace this, but for now it should work correctly. >>>>>> >>>>>> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> The results we get back from the server-side scan are already the >>>>>>> partial aggregated values we need. GroupedAggregatingResultIterator >>>>>>> will collapse adjacent Tuples together which happen to have the same row >>>>>>> key. I'm not sure we want/need this to happen. Instead I think we just >>>>>>> need >>>>>>> to decode the aggregated values directly from the result of the scan. >>>>>>> >>>>>>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi James, >>>>>>>> >>>>>>>> bq. A few questions for you: not sure I understand the changes you >>>>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side >>>>>>>> scan >>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan >>>>>>>> will >>>>>>>> produce results with a single tuple per group by key. In Phoenix, the >>>>>>>> GroupedAggregatingResultIterator's function in life is to do the final >>>>>>>> merge. Note too that the results aren't sorted that come back from the >>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples >>>>>>>> sorted >>>>>>>> by the group by key). Or is this just to help in decoding the values >>>>>>>> coming >>>>>>>> back from the scan? >>>>>>>> >>>>>>>> It is necessary. I suppose what we should return as a partial >>>>>>>> result from PhoenixRecordReader is exactly the same as what we do in >>>>>>>> standalone Phoenix+Calcite, except that the result is partial or say >>>>>>>> incomplete. For example, we have "select a, count(*) from t group by >>>>>>>> a", we >>>>>>>> should return rows that have "a" as the first expression value, and >>>>>>>> "count(*)" as the second expression value. For this "count" >>>>>>>> expression, it >>>>>>>> actually needs a ClientAggregator for evaluation, and that's what this >>>>>>>> GroupedAggregatingResultIterator is used for. >>>>>>>> Since "each server-side scan will produce results with a single >>>>>>>> tuple per group by key", and PhoenixRecordReader is only dealing with >>>>>>>> one >>>>>>>> server-side result each time, we don't care how the group-by keys are >>>>>>>> arranged (ordered or unordered"). Actually >>>>>>>> GroupedAggregatingResultIterator is not the group-by iterator we >>>>>>>> use for AggregatePlan. It does not "combine". It treats every row as a >>>>>>>> different group, by returning its rowkey as the group-by key ( >>>>>>>> GroupedAggregatingResultIterator.java:56). >>>>>>>> >>>>>>>> In short, this iterator is for decoding the server-side values. So >>>>>>>> we may want to optimize this logic by removing this serialization and >>>>>>>> deserialization and having only one set of aggregators in future. >>>>>>>> >>>>>>>> bq. Also, not sure what impact it has in the way we "combine" the >>>>>>>> scans in our Drill parallelization code >>>>>>>> (PhoenixGroupScan.applyAssignments()), >>>>>>>> as each of our scans could include duplicate group by keys. Is it ok to >>>>>>>> combine them in this case? >>>>>>>> >>>>>>>> It should not matter, or at least is not related to the problem I'm >>>>>>>> now having. >>>>>>>> >>>>>>>> bq. One more question: how is the group by key communicated back to >>>>>>>> Drill? >>>>>>>> >>>>>>>> According to the HashAggPrule, if it decides to create a two-phase >>>>>>>> aggregate, the first phase is now handled by Phoenix (after applying >>>>>>>> the >>>>>>>> PhoenixHashAggPrule). I assume then the partial results gets shuffled >>>>>>>> based >>>>>>>> on the hash of their group-by keys (returned by PhoenixRecordReader). >>>>>>>> The >>>>>>>> final step is the Drill hash aggregation. >>>>>>>> >>>>>>>> >>>>>>>> This is my test table "A.BEER", which has for columns: "B", "E1", >>>>>>>> "E2", "R", all of INTEGER types. And the data is generated like this: >>>>>>>> for (x=1 to N) { //currently N=1000 >>>>>>>> UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x); >>>>>>>> } >>>>>>>> >>>>>>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer >>>>>>>> GROUP BY e1". >>>>>>>> The expected result should be: >>>>>>>> 0 100 >>>>>>>> 1 100 >>>>>>>> 2 100 >>>>>>>> 3 100 >>>>>>>> 4 100 >>>>>>>> 5 100 >>>>>>>> 6 100 >>>>>>>> 7 100 >>>>>>>> 8 100 >>>>>>>> 9 100 >>>>>>>> The actual result was: >>>>>>>> 6 0 >>>>>>>> 7 0 >>>>>>>> 8 0 >>>>>>>> 9 0 >>>>>>>> 0 0 >>>>>>>> 1 100 >>>>>>>> 2 100 >>>>>>>> 3 100 >>>>>>>> 4 100 >>>>>>>> 5 100 >>>>>>>> >>>>>>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer >>>>>>>> GROUP BY e2". >>>>>>>> Similarly, the expected result should have group-by keys from 0 to >>>>>>>> 99, each having a value of 10 as the count, while the actual result >>>>>>>> was: >>>>>>>> from group-by key 86 to 99, together with 0, their count values >>>>>>>> were all 0; the rest (1 to 85) all had the correct value 10. >>>>>>>> >>>>>>>> Looks to me that the scans were good but there was a problem with >>>>>>>> one of the hash buckets. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Maryann >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Nice progress, Maryann. >>>>>>>>> >>>>>>>>> A few questions for you: not sure I understand the changes you >>>>>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side >>>>>>>>> scan >>>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan >>>>>>>>> will >>>>>>>>> produce results with a single tuple per group by key. In Phoenix, the >>>>>>>>> GroupedAggregatingResultIterator's function in life is to do the final >>>>>>>>> merge. Note too that the results aren't sorted that come back from the >>>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples >>>>>>>>> sorted >>>>>>>>> by the group by key). Or is this just to help in decoding the values >>>>>>>>> coming >>>>>>>>> back from the scan? >>>>>>>>> >>>>>>>>> Also, not sure what impact it has in the way we "combine" the >>>>>>>>> scans in our Drill parallelization code >>>>>>>>> (PhoenixGroupScan.applyAssignments()), as each of our scans could >>>>>>>>> include >>>>>>>>> duplicate group by keys. Is it ok to combine them in this case? >>>>>>>>> >>>>>>>>> One more question: how is the group by key communicated back to >>>>>>>>> Drill? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> James >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <[email protected] >>>>>>>>> > wrote: >>>>>>>>> >>>>>>>>>> Added a few fixes in the pull request. Tested with two regions, >>>>>>>>>> turned out that half of the result is empty (count = 0). >>>>>>>>>> Not sure if there's anything wrong with >>>>>>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java >>>>>>>>>> . >>>>>>>>>> Like Julian said, this rule looks a bit hacky. >>>>>>>>>> >>>>>>>>>> To force a 2-phase HashAgg, I made a temporary change as well: >>>>>>>>>> >>>>>>>>>> diff --git >>>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>>>>> >>>>>>>>>> index b911f6b..58bc918 100644 >>>>>>>>>> >>>>>>>>>> --- >>>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>>>>> >>>>>>>>>> +++ >>>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>>>>> >>>>>>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends >>>>>>>>>> Prule { >>>>>>>>>> >>>>>>>>>> // If any of the aggregate functions are not one of these, >>>>>>>>>> then we >>>>>>>>>> >>>>>>>>>> // currently won't generate a 2 phase plan. >>>>>>>>>> >>>>>>>>>> protected boolean create2PhasePlan(RelOptRuleCall call, >>>>>>>>>> DrillAggregateRel aggregate) { >>>>>>>>>> >>>>>>>>>> - PlannerSettings settings = >>>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner()); >>>>>>>>>> >>>>>>>>>> - RelNode child = call.rel(0).getInputs().get(0); >>>>>>>>>> >>>>>>>>>> - boolean smallInput = child.getRows() < >>>>>>>>>> settings.getSliceTarget(); >>>>>>>>>> >>>>>>>>>> - if (! settings.isMultiPhaseAggEnabled() || >>>>>>>>>> settings.isSingleMode() || smallInput) { >>>>>>>>>> >>>>>>>>>> - return false; >>>>>>>>>> >>>>>>>>>> - } >>>>>>>>>> >>>>>>>>>> +// PlannerSettings settings = >>>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner()); >>>>>>>>>> >>>>>>>>>> +// RelNode child = call.rel(0).getInputs().get(0); >>>>>>>>>> >>>>>>>>>> +// boolean smallInput = child.getRows() < >>>>>>>>>> settings.getSliceTarget(); >>>>>>>>>> >>>>>>>>>> +// if (! settings.isMultiPhaseAggEnabled() || >>>>>>>>>> settings.isSingleMode() || smallInput) { >>>>>>>>>> >>>>>>>>>> +// return false; >>>>>>>>>> >>>>>>>>>> +// } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> for (AggregateCall aggCall : aggregate.getAggCallList()) { >>>>>>>>>> >>>>>>>>>> String name = aggCall.getAggregation().getName(); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Maryann >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Drill's current approach seems adequate for Drill alone but >>>>>>>>>>> extending >>>>>>>>>>> it to a heterogenous system that includes Phoenix seems like a >>>>>>>>>>> hack. >>>>>>>>>>> >>>>>>>>>>> I think you should only create Prels for algebra nodes that you >>>>>>>>>>> know >>>>>>>>>>> for sure are going to run on the Drill engine. If there's a >>>>>>>>>>> possibility that it would run in another engine such as Phoenix >>>>>>>>>>> then >>>>>>>>>>> they should still be logical. >>>>>>>>>>> >>>>>>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> > The partial aggregate seems to be working now, with one >>>>>>>>>>> interface extension >>>>>>>>>>> > and one bug fix in the Phoenix project. Will do some code >>>>>>>>>>> cleanup and >>>>>>>>>>> > create a pull request soon. >>>>>>>>>>> > >>>>>>>>>>> > Still there was a hack in the Drill project which I made to >>>>>>>>>>> force 2-phase >>>>>>>>>>> > aggregation. I'll try to fix that. >>>>>>>>>>> > >>>>>>>>>>> > Jacques, I have one question though, how can I verify that >>>>>>>>>>> there are more >>>>>>>>>>> > than one slice and the shuffle happens? >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > Thanks, >>>>>>>>>>> > Maryann >>>>>>>>>>> > >>>>>>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> > >>>>>>>>>>> >> Maryann, >>>>>>>>>>> >> I believe Jacques mentioned that a little bit of refactoring >>>>>>>>>>> is required >>>>>>>>>>> >> for a merge sort to occur - there's something that does that, >>>>>>>>>>> but it's not >>>>>>>>>>> >> expected to be used in this context currently. >>>>>>>>>>> >> >>>>>>>>>>> >> IMHO, there's more of a clear value in getting the >>>>>>>>>>> aggregation to use >>>>>>>>>>> >> Phoenix first, so I'd recommend going down that road as >>>>>>>>>>> Jacques mentioned >>>>>>>>>>> >> above if possible. Once that's working, we can circle back to >>>>>>>>>>> the partial >>>>>>>>>>> >> sort. >>>>>>>>>>> >> >>>>>>>>>>> >> Thoughts? >>>>>>>>>>> >> James >>>>>>>>>>> >> >>>>>>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue < >>>>>>>>>>> [email protected]> >>>>>>>>>>> >> wrote: >>>>>>>>>>> >> >>>>>>>>>>> >>> I actually tried implementing partial sort with >>>>>>>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured >>>>>>>>>>> might be a >>>>>>>>>>> >>> little easier to start with than partial aggregation. But I >>>>>>>>>>> found that even >>>>>>>>>>> >>> though the code worked (returned the right results), the >>>>>>>>>>> Drill side sort >>>>>>>>>>> >>> turned out to be a ordinary sort instead of a merge which it >>>>>>>>>>> should have >>>>>>>>>>> >>> been. Any idea of how to fix that? >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> Thanks, >>>>>>>>>>> >>> Maryann >>>>>>>>>>> >>> >>>>>>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau < >>>>>>>>>>> [email protected]> >>>>>>>>>>> >>> wrote: >>>>>>>>>>> >>> >>>>>>>>>>> >>>> Right now this type of work is done here: >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> With Distribution Trait application here: >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> To me, the easiest way to solve the Phoenix issue is by >>>>>>>>>>> providing a rule >>>>>>>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix >>>>>>>>>>> convention as >>>>>>>>>>> >>>> input. It would replace everywhere but would only be >>>>>>>>>>> plannable when it is >>>>>>>>>>> >>>> the first phase of aggregation. >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> Thoughts? >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> -- >>>>>>>>>>> >>>> Jacques Nadeau >>>>>>>>>>> >>>> CTO and Co-Founder, Dremio >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>> >>>>>>>>>>> >>>>> Phoenix is able to perform quite a few relational >>>>>>>>>>> operations on the >>>>>>>>>>> >>>>> region server: scan, filter, project, aggregate, sort >>>>>>>>>>> (optionally with >>>>>>>>>>> >>>>> limit). However, the sort and aggregate are necessarily >>>>>>>>>>> "local". They >>>>>>>>>>> >>>>> can only deal with data on that region server, and there >>>>>>>>>>> needs to be a >>>>>>>>>>> >>>>> further operation to combine the results from the region >>>>>>>>>>> servers. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> The question is how to plan such queries. I think the >>>>>>>>>>> answer is an >>>>>>>>>>> >>>>> AggregateExchangeTransposeRule. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> The rule would spot an Aggregate on a data source that is >>>>>>>>>>> split into >>>>>>>>>>> >>>>> multiple locations (partitions) and split it into a >>>>>>>>>>> partial Aggregate >>>>>>>>>>> >>>>> that computes sub-totals and a summarizing Aggregate that >>>>>>>>>>> combines >>>>>>>>>>> >>>>> those totals. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> How does the planner know that the Aggregate needs to be >>>>>>>>>>> split? Since >>>>>>>>>>> >>>>> the data's distribution has changed, there would need to >>>>>>>>>>> be an >>>>>>>>>>> >>>>> Exchange operator. It is the Exchange operator that >>>>>>>>>>> triggers the rule >>>>>>>>>>> >>>>> to fire. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> There are some special cases. If the data is sorted as >>>>>>>>>>> well as >>>>>>>>>>> >>>>> partitioned (say because the local aggregate uses a >>>>>>>>>>> sort-based >>>>>>>>>>> >>>>> algorithm) we could maybe use a more efficient plan. And >>>>>>>>>>> if the >>>>>>>>>>> >>>>> partition key is the same as the aggregation key we don't >>>>>>>>>>> need a >>>>>>>>>>> >>>>> summarizing Aggregate, just a Union. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> It turns out not to be very Phoenix-specific. In the >>>>>>>>>>> Drill-on-Phoenix >>>>>>>>>>> >>>>> scenario, once the Aggregate has been pushed through the >>>>>>>>>>> Exchange >>>>>>>>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we >>>>>>>>>>> can then >>>>>>>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix >>>>>>>>>>> membrane and make >>>>>>>>>>> >>>>> it into a PhoenixServerAggregate that executes in the >>>>>>>>>>> region server. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Related issues: >>>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840 >>>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751 >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Julian >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> >>> >>>>>>>>>>> >> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
