The results we get back from the server-side scan are already the partial
aggregated values we need. GroupedAggregatingResultIterator will collapse
adjacent Tuples together which happen to have the same row key. I'm not
sure we want/need this to happen. Instead I think we just need to decode
the aggregated values directly from the result of the scan.

On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <maryann....@gmail.com> wrote:

> Hi James,
>
> bq. A few questions for you: not sure I understand the changes you made to
> PhoenixRecordReader. Is it necessary to wrap the server-side scan results
> in a GroupedAggregatingResultIterator? Each server-side scan will produce
> results with a single tuple per group by key. In Phoenix, the
> GroupedAggregatingResultIterator's function in life is to do the final
> merge. Note too that the results aren't sorted that come back from the
> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
> by the group by key). Or is this just to help in decoding the values coming
> back from the scan?
>
> It is necessary. I suppose what we should return as a partial result from
> PhoenixRecordReader is exactly the same as what we do in standalone
> Phoenix+Calcite, except that the result is partial or say incomplete. For
> example, we have "select a, count(*) from t group by a", we should return
> rows that have "a" as the first expression value, and "count(*)" as the
> second expression value. For this "count" expression, it actually needs a
> ClientAggregator for evaluation, and that's what this
> GroupedAggregatingResultIterator is used for.
> Since "each server-side scan will produce results with a single tuple per
> group by key", and PhoenixRecordReader is only dealing with one server-side
> result each time, we don't care how the group-by keys are arranged (ordered
> or unordered"). Actually GroupedAggregatingResultIterator is not the
> group-by iterator we use for AggregatePlan. It does not "combine". It
> treats every row as a different group, by returning its rowkey as the
> group-by key (GroupedAggregatingResultIterator.java:56).
>
> In short, this iterator is for decoding the server-side values. So we may
> want to optimize this logic by removing this serialization and
> deserialization and having only one set of aggregators in future.
>
> bq. Also, not sure what impact it has in the way we "combine" the scans in
> our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as
> each of our scans could include duplicate group by keys. Is it ok to
> combine them in this case?
>
> It should not matter, or at least is not related to the problem I'm now
> having.
>
> bq. One more question: how is the group by key communicated back to Drill?
>
> According to the HashAggPrule, if it decides to create a two-phase
> aggregate, the first phase is now handled by Phoenix (after applying the
> PhoenixHashAggPrule). I assume then the partial results gets shuffled based
> on the hash of their group-by keys (returned by PhoenixRecordReader). The
> final step is the Drill hash aggregation.
>
>
> This is my test table "A.BEER", which has for columns: "B", "E1", "E2",
> "R", all of INTEGER types. And the data is generated like this:
> for (x=1 to N) { //currently N=1000
>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
> }
>
> The group-by query for testing is "SELECT e1, count(*) FROM a.beer GROUP
> BY e1".
> The expected result should be:
> 0 100
> 1 100
> 2 100
> 3 100
> 4 100
> 5 100
> 6 100
> 7 100
> 8 100
> 9 100
> The actual result was:
> 6 0
> 7 0
> 8 0
> 9 0
> 0 0
> 1 100
> 2 100
> 3 100
> 4 100
> 5 100
>
> Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP BY
> e2".
> Similarly, the expected result should have group-by keys from 0 to 99,
> each having a value of 10 as the count, while the actual result was:
> from group-by key 86 to 99, together with 0, their count values were all
> 0; the rest (1 to 85) all had the correct value 10.
>
> Looks to me that the scans were good but there was a problem with one of
> the hash buckets.
>
> Thanks,
> Maryann
>
>
> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <jamestay...@apache.org>
> wrote:
>
>> Nice progress, Maryann.
>>
>> A few questions for you: not sure I understand the changes you made to
>> PhoenixRecordReader. Is it necessary to wrap the server-side scan results
>> in a GroupedAggregatingResultIterator? Each server-side scan will produce
>> results with a single tuple per group by key. In Phoenix, the
>> GroupedAggregatingResultIterator's function in life is to do the final
>> merge. Note too that the results aren't sorted that come back from the
>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>> by the group by key). Or is this just to help in decoding the values coming
>> back from the scan?
>>
>> Also, not sure what impact it has in the way we "combine" the scans in
>> our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as
>> each of our scans could include duplicate group by keys. Is it ok to
>> combine them in this case?
>>
>> One more question: how is the group by key communicated back to Drill?
>>
>> Thanks,
>> James
>>
>>
>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <maryann....@gmail.com>
>> wrote:
>>
>>> Added a few fixes in the pull request. Tested with two regions, turned
>>> out that half of the result is empty (count = 0).
>>> Not sure if there's anything wrong with
>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>> .
>>> Like Julian said, this rule looks a bit hacky.
>>>
>>> To force a 2-phase HashAgg, I made a temporary change as well:
>>>
>>> diff --git
>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>
>>> index b911f6b..58bc918 100644
>>>
>>> ---
>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>
>>> +++
>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>
>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule {
>>>
>>>    // If any of the aggregate functions are not one of these, then we
>>>
>>>    // currently won't generate a 2 phase plan.
>>>
>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>> DrillAggregateRel aggregate) {
>>>
>>> -    PlannerSettings settings =
>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>
>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>
>>> -    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>>
>>> -    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode()
>>> || smallInput) {
>>>
>>> -      return false;
>>>
>>> -    }
>>>
>>> +//    PlannerSettings settings =
>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>
>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>
>>> +//    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>>
>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>> settings.isSingleMode() || smallInput) {
>>>
>>> +//      return false;
>>>
>>> +//    }
>>>
>>>
>>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>>
>>>        String name = aggCall.getAggregation().getName();
>>>
>>>
>>> Thanks,
>>> Maryann
>>>
>>>
>>>
>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org> wrote:
>>>
>>>> Drill's current approach seems adequate for Drill alone but extending
>>>> it to a heterogenous system that includes Phoenix seems like a hack.
>>>>
>>>> I think you should only create Prels for algebra nodes that you know
>>>> for sure are going to run on the Drill engine. If there's a
>>>> possibility that it would run in another engine such as Phoenix then
>>>> they should still be logical.
>>>>
>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <maryann....@gmail.com>
>>>> wrote:
>>>> > The partial aggregate seems to be working now, with one interface
>>>> extension
>>>> > and one bug fix in the Phoenix project. Will do some code cleanup and
>>>> > create a pull request soon.
>>>> >
>>>> > Still there was a hack in the Drill project which I made to force
>>>> 2-phase
>>>> > aggregation. I'll try to fix that.
>>>> >
>>>> > Jacques, I have one question though, how can I verify that there are
>>>> more
>>>> > than one slice and the shuffle happens?
>>>> >
>>>> >
>>>> > Thanks,
>>>> > Maryann
>>>> >
>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <jamestay...@apache.org>
>>>> wrote:
>>>> >
>>>> >> Maryann,
>>>> >> I believe Jacques mentioned that a little bit of refactoring is
>>>> required
>>>> >> for a merge sort to occur - there's something that does that, but
>>>> it's not
>>>> >> expected to be used in this context currently.
>>>> >>
>>>> >> IMHO, there's more of a clear value in getting the aggregation to use
>>>> >> Phoenix first, so I'd recommend going down that road as Jacques
>>>> mentioned
>>>> >> above if possible. Once that's working, we can circle back to the
>>>> partial
>>>> >> sort.
>>>> >>
>>>> >> Thoughts?
>>>> >> James
>>>> >>
>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <maryann....@gmail.com>
>>>> >> wrote:
>>>> >>
>>>> >>> I actually tried implementing partial sort with
>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured might
>>>> be a
>>>> >>> little easier to start with than partial aggregation. But I found
>>>> that even
>>>> >>> though the code worked (returned the right results), the Drill side
>>>> sort
>>>> >>> turned out to be a ordinary sort instead of a merge which it should
>>>> have
>>>> >>> been. Any idea of how to fix that?
>>>> >>>
>>>> >>>
>>>> >>> Thanks,
>>>> >>> Maryann
>>>> >>>
>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <jacq...@dremio.com
>>>> >
>>>> >>> wrote:
>>>> >>>
>>>> >>>> Right now this type of work is done here:
>>>> >>>>
>>>> >>>>
>>>> >>>>
>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>> >>>>
>>>> >>>>
>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>> >>>>
>>>> >>>> With Distribution Trait application here:
>>>> >>>>
>>>> >>>>
>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>> >>>>
>>>> >>>> To me, the easiest way to solve the Phoenix issue is by providing
>>>> a rule
>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix convention
>>>> as
>>>> >>>> input. It would replace everywhere but would only be plannable
>>>> when it is
>>>> >>>> the first phase of aggregation.
>>>> >>>>
>>>> >>>> Thoughts?
>>>> >>>>
>>>> >>>>
>>>> >>>>
>>>> >>>> --
>>>> >>>> Jacques Nadeau
>>>> >>>> CTO and Co-Founder, Dremio
>>>> >>>>
>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org>
>>>> wrote:
>>>> >>>>
>>>> >>>>> Phoenix is able to perform quite a few relational operations on
>>>> the
>>>> >>>>> region server: scan, filter, project, aggregate, sort (optionally
>>>> with
>>>> >>>>> limit). However, the sort and aggregate are necessarily "local".
>>>> They
>>>> >>>>> can only deal with data on that region server, and there needs to
>>>> be a
>>>> >>>>> further operation to combine the results from the region servers.
>>>> >>>>>
>>>> >>>>> The question is how to plan such queries. I think the answer is an
>>>> >>>>> AggregateExchangeTransposeRule.
>>>> >>>>>
>>>> >>>>> The rule would spot an Aggregate on a data source that is split
>>>> into
>>>> >>>>> multiple locations (partitions) and split it into a partial
>>>> Aggregate
>>>> >>>>> that computes sub-totals and a summarizing Aggregate that combines
>>>> >>>>> those totals.
>>>> >>>>>
>>>> >>>>> How does the planner know that the Aggregate needs to be split?
>>>> Since
>>>> >>>>> the data's distribution has changed, there would need to be an
>>>> >>>>> Exchange operator. It is the Exchange operator that triggers the
>>>> rule
>>>> >>>>> to fire.
>>>> >>>>>
>>>> >>>>> There are some special cases. If the data is sorted as well as
>>>> >>>>> partitioned (say because the local aggregate uses a sort-based
>>>> >>>>> algorithm) we could maybe use a more efficient plan. And if the
>>>> >>>>> partition key is the same as the aggregation key we don't need a
>>>> >>>>> summarizing Aggregate, just a Union.
>>>> >>>>>
>>>> >>>>> It turns out not to be very Phoenix-specific. In the
>>>> Drill-on-Phoenix
>>>> >>>>> scenario, once the Aggregate has been pushed through the Exchange
>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we can
>>>> then
>>>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane and
>>>> make
>>>> >>>>> it into a PhoenixServerAggregate that executes in the region
>>>> server.
>>>> >>>>>
>>>> >>>>> Related issues:
>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>> >>>>>
>>>> >>>>> Julian
>>>> >>>>>
>>>> >>>>
>>>> >>>>
>>>> >>>
>>>> >>
>>>>
>>>
>>>
>>
>

Reply via email to