DRILL-768: Support 2 phase COUNT() aggregates. Create a new SUM aggregate function whose return type is non-nullable to match the return type of COUNT.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/78ae2658 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/78ae2658 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/78ae2658 Branch: refs/heads/master Commit: 78ae26589745b0ed538a15644f6fd6897cb9e5ad Parents: e9ac37d Author: Aman Sinha <[email protected]> Authored: Fri May 16 10:33:10 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon May 19 17:46:39 2014 -0700 ---------------------------------------------------------------------- .../exec/planner/physical/AggPrelBase.java | 180 +++++++++++++++++++ .../exec/planner/physical/AggPruleBase.java | 2 +- .../exec/planner/physical/HashAggPrel.java | 51 +----- .../exec/planner/physical/HashAggPrule.java | 10 +- .../exec/planner/physical/StreamAggPrel.java | 72 ++------ .../exec/planner/physical/StreamAggPrule.java | 21 ++- 6 files changed, 221 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java new file mode 100644 index 0000000..c3b1188 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import java.util.BitSet; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import net.hydromatic.linq4j.Ord; +import net.hydromatic.optiq.util.BitSets; + +import org.eigenbase.rel.AggregateCall; +import org.eigenbase.rel.AggregateRelBase; +import org.eigenbase.rel.Aggregation; +import org.eigenbase.rel.InvalidRelException; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.reltype.RelDataType; +import org.eigenbase.reltype.RelDataTypeFactory; +import org.eigenbase.sql.SqlAggFunction; +import org.eigenbase.sql.SqlFunctionCategory; +import org.eigenbase.sql.SqlKind; +import org.eigenbase.sql.type.OperandTypes; +import org.eigenbase.sql.type.ReturnTypes; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.planner.logical.DrillParseContext; + +import com.beust.jcommander.internal.Lists; +import com.google.common.collect.ImmutableList; + + +public abstract class AggPrelBase extends AggregateRelBase implements Prel{ + + protected static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2}; + + protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1 ; // default phase + protected List<NamedExpression> keys = Lists.newArrayList(); + protected List<NamedExpression> aggExprs = Lists.newArrayList(); + protected List<AggregateCall> phase2AggCallList = Lists.newArrayList(); + + + /** + * Specialized aggregate function for SUMing the COUNTs. Since return type of + * COUNT is non-nullable and return type of SUM is nullable, this class enables + * creating a SUM whose return type is non-nullable. + * + */ + public class SqlSumCountAggFunction extends SqlAggFunction { + + private final RelDataType type; + + public SqlSumCountAggFunction(RelDataType type) { + super("SUM", + SqlKind.OTHER_FUNCTION, + ReturnTypes.BIGINT, // use the inferred return type of SqlCountAggFunction + null, + OperandTypes.NUMERIC, + SqlFunctionCategory.NUMERIC); + + this.type = type; + } + + public List<RelDataType> getParameterTypes(RelDataTypeFactory typeFactory) { + return ImmutableList.of(type); + } + + public RelDataType getType() { + return type; + } + + public RelDataType getReturnType(RelDataTypeFactory typeFactory) { + return type; + } + + } + + public AggPrelBase(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet, + List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException { + super(cluster, traits, child, groupSet, aggCalls); + this.operPhase = phase; + createKeysAndExprs(); + } + + public OperatorPhase getOperatorPhase() { + return operPhase; + } + + public List<NamedExpression> getKeys() { + return keys; + } + + public List<NamedExpression> getAggExprs() { + return aggExprs; + } + + public List<AggregateCall> getPhase2AggCalls() { + return phase2AggCallList; + } + + protected void createKeysAndExprs() { + final List<String> childFields = getChild().getRowType().getFieldNames(); + final List<String> fields = getRowType().getFieldNames(); + + for (int group : BitSets.toIter(groupSet)) { + FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN); + keys.add(new NamedExpression(fr, fr)); + } + + for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) { + int aggExprOrdinal = groupSet.cardinality() + aggCall.i; + FieldReference ref = new FieldReference(fields.get(aggExprOrdinal)); + LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext()); + NamedExpression ne = new NamedExpression(expr, ref); + aggExprs.add(ne); + + if (getOperatorPhase() == OperatorPhase.PHASE_1of2) { + if (aggCall.e.getAggregation().getName().equals("COUNT")) { + // If we are doing a COUNT aggregate in Phase1of2, then in Phase2of2 we should SUM the COUNTs, + Aggregation sumAggFun = new SqlSumCountAggFunction(aggCall.e.getType()); + AggregateCall newAggCall = + new AggregateCall( + sumAggFun, + aggCall.e.isDistinct(), + Collections.singletonList(aggExprOrdinal), + aggCall.e.getType(), + aggCall.e.getName()); + + phase2AggCallList.add(newAggCall); + } else { + phase2AggCallList.add(aggCall.e); + } + } + } + } + + protected LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) { + List<LogicalExpression> args = Lists.newArrayList(); + for(Integer i : call.getArgList()){ + args.add(new FieldReference(fn.get(i))); + } + + // for count(1). + if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l)); + LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN ); + return expr; + } + + @Override + public Iterator<Prel> iterator() { + return PrelUtil.iter(getChild()); + } + + @Override + public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { + return logicalVisitor.visitPrel(this, value); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java index 563458e..4edeaf8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java @@ -67,7 +67,7 @@ public abstract class AggPruleBase extends Prule { for (AggregateCall aggCall : aggregate.getAggCallList()) { String name = aggCall.getAggregation().getName(); - if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX"))) { + if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX") || name.equals("COUNT"))) { return false; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java index b2378be..31feb48 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.BitSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import net.hydromatic.linq4j.Ord; import net.hydromatic.optiq.util.BitSets; @@ -49,18 +50,19 @@ import org.eigenbase.relopt.RelTraitSet; import com.beust.jcommander.internal.Lists; -public class HashAggPrel extends AggregateRelBase implements Prel{ +public class HashAggPrel extends AggPrelBase implements Prel{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggPrel.class); public HashAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet, - List<AggregateCall> aggCalls) throws InvalidRelException { - super(cluster, traits, child, groupSet, aggCalls); + List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException { + super(cluster, traits, child, groupSet, aggCalls, phase); } public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) { try { - return new HashAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls); + return new HashAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls, + this.getOperatorPhase()); } catch (InvalidRelException e) { throw new AssertionError(e); } @@ -88,54 +90,16 @@ public class HashAggPrel extends AggregateRelBase implements Prel{ @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { - final List<String> childFields = getChild().getRowType().getFieldNames(); - final List<String> fields = getRowType().getFieldNames(); - List<NamedExpression> keys = Lists.newArrayList(); - List<NamedExpression> exprs = Lists.newArrayList(); - - for (int group : BitSets.toIter(groupSet)) { - FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN); - keys.add(new NamedExpression(fr, fr)); - } - - for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) { - FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i)); - LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext()); - exprs.add(new NamedExpression(expr, ref)); - } - Prel child = (Prel) this.getChild(); HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]), - exprs.toArray(new NamedExpression[exprs.size()]), + aggExprs.toArray(new NamedExpression[aggExprs.size()]), 1.0f); return g; } - private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) { - List<LogicalExpression> args = Lists.newArrayList(); - for(Integer i : call.getArgList()){ - args.add(new FieldReference(fn.get(i))); - } - - // for count(1). - if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l)); - LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN ); - return expr; - } - - @Override - public Iterator<Prel> iterator() { - return PrelUtil.iter(getChild()); - } - - @Override - public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { - return logicalVisitor.visitPrel(this, value); - } - @Override public SelectionVectorMode[] getSupportedEncodings() { return SelectionVectorMode.DEFAULT; @@ -146,5 +110,4 @@ public class HashAggPrel extends AggregateRelBase implements Prel{ return SelectionVectorMode.NONE; } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java index 9395a1d..95c8362 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java @@ -22,6 +22,7 @@ import java.util.logging.Logger; import org.apache.drill.exec.planner.logical.DrillAggregateRel; import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase; import org.eigenbase.rel.InvalidRelException; import org.eigenbase.rel.RelNode; import org.eigenbase.relopt.RelOptRule; @@ -95,7 +96,8 @@ public class HashAggPrule extends AggPruleBase { HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput, aggregate.getGroupSet(), - aggregate.getAggCallList()); + aggregate.getAggCallList(), + OperatorPhase.PHASE_1of2); HashToRandomExchangePrel exch = new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys), @@ -103,7 +105,9 @@ public class HashAggPrule extends AggPruleBase { HashAggPrel phase2Agg = new HashAggPrel(aggregate.getCluster(), traits, exch, aggregate.getGroupSet(), - aggregate.getAggCallList()); + phase1Agg.getPhase2AggCalls(), + OperatorPhase.PHASE_2of2); + call.transformTo(phase2Agg); } @@ -122,7 +126,7 @@ public class HashAggPrule extends AggPruleBase { final RelNode convertedInput = convert(input, PrelUtil.fixTraits(call, traits)); HashAggPrel newAgg = new HashAggPrel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(), - aggregate.getAggCallList()); + aggregate.getAggCallList(), OperatorPhase.PHASE_1of1); call.transformTo(newAgg); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java index 5fb758a..9706254 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java @@ -19,24 +19,13 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; import java.util.BitSet; -import java.util.Iterator; import java.util.List; -import net.hydromatic.linq4j.Ord; -import net.hydromatic.optiq.util.BitSets; - -import org.apache.drill.common.expression.ExpressionPosition; -import org.apache.drill.common.expression.FieldReference; -import org.apache.drill.common.expression.FunctionCall; -import org.apache.drill.common.expression.LogicalExpression; -import org.apache.drill.common.expression.ValueExpressions; import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.physical.config.SingleMergeExchange; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.planner.cost.DrillCostBase; import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory; -import org.apache.drill.exec.planner.logical.DrillParseContext; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.eigenbase.rel.AggregateCall; import org.eigenbase.rel.AggregateRelBase; @@ -48,30 +37,27 @@ import org.eigenbase.relopt.RelOptCost; import org.eigenbase.relopt.RelOptPlanner; import org.eigenbase.relopt.RelTraitSet; -import com.beust.jcommander.internal.Lists; - -public class StreamAggPrel extends AggregateRelBase implements Prel{ +public class StreamAggPrel extends AggPrelBase implements Prel{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamAggPrel.class); + + public StreamAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet, - List<AggregateCall> aggCalls) throws InvalidRelException { - super(cluster, traits, child, groupSet, aggCalls); - for (AggregateCall aggCall : aggCalls) { - if (aggCall.isDistinct()) { - throw new InvalidRelException("DrillAggregateRel does not support DISTINCT aggregates"); - } - } + List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException { + super(cluster, traits, child, groupSet, aggCalls, phase); } public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) { try { - return new StreamAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls); + return new StreamAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls, + this.getOperatorPhase()); } catch (InvalidRelException e) { throw new AssertionError(e); } } + @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) { @@ -91,51 +77,15 @@ public class StreamAggPrel extends AggregateRelBase implements Prel{ @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { - final List<String> childFields = getChild().getRowType().getFieldNames(); - final List<String> fields = getRowType().getFieldNames(); - List<NamedExpression> keys = Lists.newArrayList(); - List<NamedExpression> exprs = Lists.newArrayList(); - - for (int group : BitSets.toIter(groupSet)) { - FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN); - keys.add(new NamedExpression(fr, fr)); - } - - for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) { - FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i)); - LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext()); - exprs.add(new NamedExpression(expr, ref)); - } Prel child = (Prel) this.getChild(); - StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]), exprs.toArray(new NamedExpression[exprs.size()]), 1.0f); + StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]), + aggExprs.toArray(new NamedExpression[aggExprs.size()]), 1.0f); return g; } - - private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) { - List<LogicalExpression> args = Lists.newArrayList(); - for(Integer i : call.getArgList()){ - args.add(new FieldReference(fn.get(i))); - } - - // for count(1). - if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l)); - LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN ); - return expr; - } - - @Override - public Iterator<Prel> iterator() { - return PrelUtil.iter(getChild()); - } - - @Override - public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { - return logicalVisitor.visitPrel(this, value); - } - + @Override public SelectionVectorMode[] getSupportedEncodings() { return SelectionVectorMode.ALL; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java index ff648a4..9a60a14 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java @@ -25,6 +25,7 @@ import net.hydromatic.optiq.util.BitSets; import org.apache.drill.exec.planner.logical.DrillAggregateRel; import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase; import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField; import org.eigenbase.rel.AggregateCall; import org.eigenbase.rel.InvalidRelException; @@ -59,8 +60,12 @@ public class StreamAggPrule extends AggPruleBase { final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0); final RelNode input = aggregate.getChild(); RelCollation collation = getCollation(aggregate); - RelTraitSet traits = null; + + if (aggregate.containsDistinctCall()) { + // currently, don't use StreamingAggregate if any of the logical aggrs contains DISTINCT + return; + } try { if (aggregate.getGroupSet().isEmpty()) { @@ -82,14 +87,16 @@ public class StreamAggPrule extends AggPruleBase { StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput, aggregate.getGroupSet(), - aggregate.getAggCallList()); + aggregate.getAggCallList(), + OperatorPhase.PHASE_1of2); UnionExchangePrel exch = new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg); StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch, aggregate.getGroupSet(), - aggregate.getAggCallList()); + phase1Agg.getPhase2AggCalls(), + OperatorPhase.PHASE_2of2); call.transformTo(phase2Agg); } @@ -135,7 +142,8 @@ public class StreamAggPrule extends AggPruleBase { StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput, aggregate.getGroupSet(), - aggregate.getAggCallList()); + aggregate.getAggCallList(), + OperatorPhase.PHASE_1of2); int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints(); @@ -147,7 +155,8 @@ public class StreamAggPrule extends AggPruleBase { StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), traits, exch, aggregate.getGroupSet(), - aggregate.getAggCallList()); + phase1Agg.getPhase2AggCalls(), + OperatorPhase.PHASE_2of2); call.transformTo(phase2Agg); } @@ -166,7 +175,7 @@ public class StreamAggPrule extends AggPruleBase { final RelNode convertedInput = convert(input, traits); StreamAggPrel newAgg = new StreamAggPrel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(), - aggregate.getAggCallList()); + aggregate.getAggCallList(), OperatorPhase.PHASE_1of1); call.transformTo(newAgg); }
