korlov42 commented on a change in pull request #8683:
URL: https://github.com/apache/ignite/pull/8683#discussion_r585492309



##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Supplier;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class SortAggregateNode<Row> extends AbstractNode<Row> implements 
SingleNode<Row>, Downstream<Row> {
+    /** */
+    private final AggregateType type;
+
+    /** */
+    private final Supplier<List<AccumulatorWrapper<Row>>> accFactory;
+
+    /** */
+    private final RowFactory<Row> rowFactory;
+
+    /** */
+    private final ImmutableBitSet grpSet;
+
+    /** */
+    private final Comparator<Row> comp;
+
+    /** */
+    private Row prevRow;
+
+    /** */
+    private Group grp;
+
+    /** */
+    private int requested;
+
+    /** */
+    private int waiting;
+
+    /** */
+    private int cmpRes;
+
+    /**
+     * @param ctx Execution context.
+     */
+    public SortAggregateNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        AggregateType type,
+        ImmutableBitSet grpSet,
+        Supplier<List<AccumulatorWrapper<Row>>> accFactory,
+        RowFactory<Row> rowFactory,
+        Comparator<Row> comp
+    ) {
+        super(ctx, rowType);
+
+        this.type = type;
+        this.accFactory = accFactory;
+        this.rowFactory = rowFactory;
+        this.grpSet = grpSet;
+        this.comp = comp;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) {
+        assert !F.isEmpty(sources()) && sources().size() == 1;
+        assert rowsCnt > 0 && requested == 0;
+        assert waiting <= 0;
+
+        try {

Review comment:
       currently all errors should be handled outside the execution node

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Supplier;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class SortAggregateNode<Row> extends AbstractNode<Row> implements 
SingleNode<Row>, Downstream<Row> {
+    /** */
+    private final AggregateType type;
+
+    /** */
+    private final Supplier<List<AccumulatorWrapper<Row>>> accFactory;
+
+    /** */
+    private final RowFactory<Row> rowFactory;
+
+    /** */
+    private final ImmutableBitSet grpSet;
+
+    /** */
+    private final Comparator<Row> comp;
+
+    /** */
+    private Row prevRow;
+
+    /** */
+    private Group grp;
+
+    /** */
+    private int requested;
+
+    /** */
+    private int waiting;
+
+    /** */
+    private int cmpRes;
+
+    /**
+     * @param ctx Execution context.
+     */
+    public SortAggregateNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        AggregateType type,
+        ImmutableBitSet grpSet,
+        Supplier<List<AccumulatorWrapper<Row>>> accFactory,
+        RowFactory<Row> rowFactory,
+        Comparator<Row> comp
+    ) {
+        super(ctx, rowType);
+
+        this.type = type;
+        this.accFactory = accFactory;
+        this.rowFactory = rowFactory;
+        this.grpSet = grpSet;
+        this.comp = comp;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) {
+        assert !F.isEmpty(sources()) && sources().size() == 1;
+        assert rowsCnt > 0 && requested == 0;
+        assert waiting <= 0;
+
+        try {
+            checkState();
+
+            requested = rowsCnt;
+
+            if (waiting == 0) {
+                waiting = requested;
+
+                source().request(requested);

Review comment:
       I don't think it's a good idea to request the requested amount. Assume 
following case: at the bottom a scan node with 1kk rows, and we need to get 
count over an empty group set (`select count(*) from MyTable`). If a node above 
requests a single row, this will lead to spawning of 1kk tasks for each row. 
Seems the best way is to request IN_BUFFER_SIZE rows from source for aggregate 
node.

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistinctRowCount.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdDistinctRowCount;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.ImmutableBitSet;
+
+/** */
+@SuppressWarnings("unused") // actually all methods are used by runtime 
generated classes
+public class IgniteMdDistinctRowCount extends RelMdDistinctRowCount {
+    public static final RelMetadataProvider SOURCE =
+        ReflectiveRelMetadataProvider.reflectiveSource(
+            BuiltInMethod.DISTINCT_ROW_COUNT.method, new 
IgniteMdDistinctRowCount());
+
+    /** {@inheritDoc} */
+    @Override public Double getDistinctRowCount(
+        RelNode rel,
+        RelMetadataQuery mq,
+        ImmutableBitSet groupKey,
+        RexNode predicate
+    ) {
+        double rowCount = mq.getRowCount(rel);
+
+        rowCount *= 1.0 - Math.pow(.5, groupKey.cardinality());

Review comment:
       groupKey.cardinality() could be empty, this case should be covered here

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
##########
@@ -17,258 +17,112 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.mapping.Mappings;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.NotNull;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 
-import static org.apache.calcite.plan.RelOptRule.convert;
-import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.calcite.util.ImmutableIntList.range;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.broadcast;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.hash;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.random;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.single;
 import static 
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
 
 /**
  *
  */
-public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel 
{
+public abstract class IgniteAggregate extends Aggregate {
     /** {@inheritDoc} */
-    public IgniteAggregate(RelOptCluster cluster, RelTraitSet traitSet, 
RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, 
List<AggregateCall> aggCalls) {
+    protected IgniteAggregate(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls
+    ) {
         super(cluster, traitSet, ImmutableList.of(), input, groupSet, 
groupSets, aggCalls);
     }
 
     /** {@inheritDoc} */
-    public IgniteAggregate(RelInput input) {
+    protected IgniteAggregate(RelInput input) {
         super(changeTraits(input, IgniteConvention.INSTANCE));
     }
 
-    /** {@inheritDoc} */
-    @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, 
ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> 
aggCalls) {
-        return new IgniteAggregate(getCluster(), traitSet, input, groupSet, 
groupSets, aggCalls);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
-        return visitor.visit(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Distribution propagation is based on next rules:
-        // 1) Any aggregation is possible on single or broadcast distribution.
-        // 2) hash-distributed aggregation is possible in case it's a simple 
aggregate having hash distributed input
-        //    and all of input distribution keys are parts of aggregation 
group and vice versa.
-        // 3) Map-reduce aggregation is possible in case it's a simple 
aggregate and its input has random distribution.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
-
-        RelDistribution.Type distrType = distribution.getType();
-
-        switch (distrType) {
-            case SINGLETON:
-            case BROADCAST_DISTRIBUTED:
-                return Pair.of(nodeTraits, 
ImmutableList.of(in.replace(distribution)));
-
-            case RANDOM_DISTRIBUTED:
-                if (!groupSet.isEmpty() && isSimple(this)) {
-                    IgniteDistribution outDistr = hash(range(0, 
groupSet.cardinality()));
-                    IgniteDistribution inDistr = hash(groupSet.asList());
-
-                    return Pair.of(nodeTraits.replace(outDistr), 
ImmutableList.of(in.replace(inDistr)));
-                }
-
-                break;
-
-            case HASH_DISTRIBUTED:
-                ImmutableIntList keys = distribution.getKeys();
-
-                if (isSimple(this) && groupSet.cardinality() == keys.size()) {
-                    Mappings.TargetMapping mapping = Commons.inverseMapping(
-                        groupSet, getInput().getRowType().getFieldCount());
-
-                    List<Integer> srcKeys = new ArrayList<>(keys.size());
-
-                    for (int key : keys) {
-                        int src = mapping.getSourceOpt(key);
-
-                        if (src == -1)
-                            break;
-
-                        srcKeys.add(src);
-                    }
-
-                    if (srcKeys.size() == keys.size())
-                        return Pair.of(nodeTraits, 
ImmutableList.of(in.replace(hash(srcKeys, distribution.function()))));
-                }
-
-                break;
-
-            default:
-                break;
-        }
-
-        return Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(single())));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Since it's a hash aggregate it erases collation.
-        return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Aggregate is rewindable if its input is rewindable.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        RewindabilityTrait rewindability = isMapReduce(nodeTraits, in)
-            ? RewindabilityTrait.ONE_WAY
-            : TraitUtils.rewindability(in);
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(rewindability), 
ImmutableList.of(in.replace(rewindability))));
+    /** */
+    @Override public double estimateRowCount(RelMetadataQuery mq) {
+        if (groupSet.cardinality() == 0)
+            return 1;
+
+        Double groupsCnt = mq.getDistinctRowCount(getInput(), groupSet, null);
+
+        // Estimation of the groups count is not available.
+        // Use heuristic estimation for result rows count.
+        if (groupsCnt == null)
+            return super.estimateRowCount(mq);
+        else
+            return groupsCnt;
     }
 
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Distribution propagation is based on next rules:
-        // 1) Any aggregation is possible on single or broadcast distribution.
-        // 2) hash-distributed aggregation is possible in case it's a simple 
aggregate having hash distributed input
-        //    and all of input distribution keys are parts aggregation group.
-        // 3) Map-reduce aggregation is possible in case it's a simple 
aggregate and its input has random distribution.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        List<Pair<RelTraitSet, List<RelTraitSet>>> res = new ArrayList<>();
-
-        IgniteDistribution distribution = TraitUtils.distribution(in);
-
-        RelDistribution.Type distrType = distribution.getType();
-
-        switch (distrType) {
-            case SINGLETON:
-            case BROADCAST_DISTRIBUTED:
-                res.add(Pair.of(nodeTraits.replace(distribution), 
ImmutableList.of(in)));
-
-                break;
-
-            case HASH_DISTRIBUTED:
-                if (isSimple(this)) {
-                    ImmutableIntList keys = distribution.getKeys();
-
-                    if (groupSet.cardinality() == keys.size()) {
-                        Mappings.TargetMapping mapping = 
Commons.inverseMapping(
-                            groupSet, getInput().getRowType().getFieldCount());
-
-                        IgniteDistribution outDistr = 
distribution.apply(mapping);
-
-                        if (outDistr.getType() == HASH_DISTRIBUTED)
-                            res.add(Pair.of(nodeTraits.replace(outDistr), 
ImmutableList.of(in)));
-                    }
-                }
-
-                break;
-
-            case RANDOM_DISTRIBUTED:
-                // Map-reduce aggregates
-                if (isSimple(this)) {
-                    res.add(Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(random()))));
-                    res.add(Pair.of(nodeTraits.replace(broadcast()), 
ImmutableList.of(in.replace(random()))));
-                }
-
-                break;
-
-            default:
-                break;
+    /** */
+    public double estimateMemoryForGroup(RelMetadataQuery mq) {
+        if (aggCalls.isEmpty())
+            return groupSet.cardinality() * IgniteCost.AVERAGE_FIELD_SIZE;
+        else {
+            double mem = 0d;
+
+            double grps = estimateRowCount(mq);
+            double rows = input.estimateRowCount(mq);
+
+            for (AggregateCall aggCall : aggCalls) {
+                if (aggCall.isDistinct())
+                    mem += IgniteCost.AGG_CALL_MEM_COST * rows / grps;
+                else
+                    mem += IgniteCost.AGG_CALL_MEM_COST;
+            }
+
+            return mem;
         }
-
-        if (!res.isEmpty())
-            return res;
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(single()))));
     }
 
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Since it's a hash aggregate it erases collation.
-
-        return 
ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            
ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveCorrelation(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        return 
ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
-            inTraits));
-    }
-
-    /** {@inheritDoc} */
-    @Override public @NotNull RelNode createNode(RelTraitSet outTraits, 
List<RelTraitSet> inTraits) {
-        RelTraitSet in = inTraits.get(0);
-
-        if (!isMapReduce(outTraits, in))
-            return copy(outTraits, ImmutableList.of(convert(getInput(), in)));
+    /** */
+    public RelOptCost computeSelfCostHash(RelOptPlanner planner, 
RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = 
(IgniteCostFactory)planner.getCostFactory();
 
-        if (U.assertionsEnabled()) {
-            ImmutableList<RelTrait> diff = in.difference(outTraits);
+        double rows = mq.getRowCount(getInput());
 
-            assert diff.size() == 1 && F.first(diff) == 
TraitUtils.distribution(outTraits);
-        }
+        double groupsCnt = estimateRowCount(mq);
 
-        RelNode map = new IgniteMapAggregate(getCluster(), in, 
convert(getInput(), in), groupSet, groupSets, aggCalls);
-        return new IgniteReduceAggregate(getCluster(), outTraits, convert(map, 
outTraits), groupSet, groupSets, aggCalls, getRowType());
+        return costFactory.makeCost(
+            groupsCnt,

Review comment:
       ```suggestion
               rows,
   ```

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
##########
@@ -17,258 +17,112 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.mapping.Mappings;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.NotNull;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 
-import static org.apache.calcite.plan.RelOptRule.convert;
-import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.calcite.util.ImmutableIntList.range;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.broadcast;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.hash;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.random;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.single;
 import static 
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
 
 /**
  *
  */
-public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel 
{
+public abstract class IgniteAggregate extends Aggregate {
     /** {@inheritDoc} */
-    public IgniteAggregate(RelOptCluster cluster, RelTraitSet traitSet, 
RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, 
List<AggregateCall> aggCalls) {
+    protected IgniteAggregate(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls
+    ) {
         super(cluster, traitSet, ImmutableList.of(), input, groupSet, 
groupSets, aggCalls);
     }
 
     /** {@inheritDoc} */
-    public IgniteAggregate(RelInput input) {
+    protected IgniteAggregate(RelInput input) {
         super(changeTraits(input, IgniteConvention.INSTANCE));
     }
 
-    /** {@inheritDoc} */
-    @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, 
ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> 
aggCalls) {
-        return new IgniteAggregate(getCluster(), traitSet, input, groupSet, 
groupSets, aggCalls);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
-        return visitor.visit(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Distribution propagation is based on next rules:
-        // 1) Any aggregation is possible on single or broadcast distribution.
-        // 2) hash-distributed aggregation is possible in case it's a simple 
aggregate having hash distributed input
-        //    and all of input distribution keys are parts of aggregation 
group and vice versa.
-        // 3) Map-reduce aggregation is possible in case it's a simple 
aggregate and its input has random distribution.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
-
-        RelDistribution.Type distrType = distribution.getType();
-
-        switch (distrType) {
-            case SINGLETON:
-            case BROADCAST_DISTRIBUTED:
-                return Pair.of(nodeTraits, 
ImmutableList.of(in.replace(distribution)));
-
-            case RANDOM_DISTRIBUTED:
-                if (!groupSet.isEmpty() && isSimple(this)) {
-                    IgniteDistribution outDistr = hash(range(0, 
groupSet.cardinality()));
-                    IgniteDistribution inDistr = hash(groupSet.asList());
-
-                    return Pair.of(nodeTraits.replace(outDistr), 
ImmutableList.of(in.replace(inDistr)));
-                }
-
-                break;
-
-            case HASH_DISTRIBUTED:
-                ImmutableIntList keys = distribution.getKeys();
-
-                if (isSimple(this) && groupSet.cardinality() == keys.size()) {
-                    Mappings.TargetMapping mapping = Commons.inverseMapping(
-                        groupSet, getInput().getRowType().getFieldCount());
-
-                    List<Integer> srcKeys = new ArrayList<>(keys.size());
-
-                    for (int key : keys) {
-                        int src = mapping.getSourceOpt(key);
-
-                        if (src == -1)
-                            break;
-
-                        srcKeys.add(src);
-                    }
-
-                    if (srcKeys.size() == keys.size())
-                        return Pair.of(nodeTraits, 
ImmutableList.of(in.replace(hash(srcKeys, distribution.function()))));
-                }
-
-                break;
-
-            default:
-                break;
-        }
-
-        return Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(single())));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Since it's a hash aggregate it erases collation.
-        return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Aggregate is rewindable if its input is rewindable.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        RewindabilityTrait rewindability = isMapReduce(nodeTraits, in)
-            ? RewindabilityTrait.ONE_WAY
-            : TraitUtils.rewindability(in);
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(rewindability), 
ImmutableList.of(in.replace(rewindability))));
+    /** */
+    @Override public double estimateRowCount(RelMetadataQuery mq) {
+        if (groupSet.cardinality() == 0)
+            return 1;
+
+        Double groupsCnt = mq.getDistinctRowCount(getInput(), groupSet, null);
+
+        // Estimation of the groups count is not available.
+        // Use heuristic estimation for result rows count.
+        if (groupsCnt == null)
+            return super.estimateRowCount(mq);
+        else
+            return groupsCnt;
     }
 
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Distribution propagation is based on next rules:
-        // 1) Any aggregation is possible on single or broadcast distribution.
-        // 2) hash-distributed aggregation is possible in case it's a simple 
aggregate having hash distributed input
-        //    and all of input distribution keys are parts aggregation group.
-        // 3) Map-reduce aggregation is possible in case it's a simple 
aggregate and its input has random distribution.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        List<Pair<RelTraitSet, List<RelTraitSet>>> res = new ArrayList<>();
-
-        IgniteDistribution distribution = TraitUtils.distribution(in);
-
-        RelDistribution.Type distrType = distribution.getType();
-
-        switch (distrType) {
-            case SINGLETON:
-            case BROADCAST_DISTRIBUTED:
-                res.add(Pair.of(nodeTraits.replace(distribution), 
ImmutableList.of(in)));
-
-                break;
-
-            case HASH_DISTRIBUTED:
-                if (isSimple(this)) {
-                    ImmutableIntList keys = distribution.getKeys();
-
-                    if (groupSet.cardinality() == keys.size()) {
-                        Mappings.TargetMapping mapping = 
Commons.inverseMapping(
-                            groupSet, getInput().getRowType().getFieldCount());
-
-                        IgniteDistribution outDistr = 
distribution.apply(mapping);
-
-                        if (outDistr.getType() == HASH_DISTRIBUTED)
-                            res.add(Pair.of(nodeTraits.replace(outDistr), 
ImmutableList.of(in)));
-                    }
-                }
-
-                break;
-
-            case RANDOM_DISTRIBUTED:
-                // Map-reduce aggregates
-                if (isSimple(this)) {
-                    res.add(Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(random()))));
-                    res.add(Pair.of(nodeTraits.replace(broadcast()), 
ImmutableList.of(in.replace(random()))));
-                }
-
-                break;
-
-            default:
-                break;
+    /** */
+    public double estimateMemoryForGroup(RelMetadataQuery mq) {
+        if (aggCalls.isEmpty())
+            return groupSet.cardinality() * IgniteCost.AVERAGE_FIELD_SIZE;
+        else {
+            double mem = 0d;
+
+            double grps = estimateRowCount(mq);
+            double rows = input.estimateRowCount(mq);
+
+            for (AggregateCall aggCall : aggCalls) {
+                if (aggCall.isDistinct())
+                    mem += IgniteCost.AGG_CALL_MEM_COST * rows / grps;
+                else
+                    mem += IgniteCost.AGG_CALL_MEM_COST;
+            }
+
+            return mem;
         }
-
-        if (!res.isEmpty())
-            return res;
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(single()))));
     }
 
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Since it's a hash aggregate it erases collation.
-
-        return 
ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            
ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveCorrelation(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        return 
ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
-            inTraits));
-    }
-
-    /** {@inheritDoc} */
-    @Override public @NotNull RelNode createNode(RelTraitSet outTraits, 
List<RelTraitSet> inTraits) {
-        RelTraitSet in = inTraits.get(0);
-
-        if (!isMapReduce(outTraits, in))
-            return copy(outTraits, ImmutableList.of(convert(getInput(), in)));
+    /** */
+    public RelOptCost computeSelfCostHash(RelOptPlanner planner, 
RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = 
(IgniteCostFactory)planner.getCostFactory();
 
-        if (U.assertionsEnabled()) {
-            ImmutableList<RelTrait> diff = in.difference(outTraits);
+        double rows = mq.getRowCount(getInput());
 
-            assert diff.size() == 1 && F.first(diff) == 
TraitUtils.distribution(outTraits);
-        }
+        double groupsCnt = estimateRowCount(mq);
 
-        RelNode map = new IgniteMapAggregate(getCluster(), in, 
convert(getInput(), in), groupSet, groupSets, aggCalls);
-        return new IgniteReduceAggregate(getCluster(), outTraits, convert(map, 
outTraits), groupSet, groupSets, aggCalls, getRowType());
+        return costFactory.makeCost(
+            groupsCnt,
+            rows * IgniteCost.ROW_PASS_THROUGH_COST,
+            0,
+            groupsCnt * estimateMemoryForGroup(mq),
+            0
+        );
     }
 
     /** */
-    private boolean isMapReduce(RelTraitSet out, RelTraitSet in) {
-        return TraitUtils.distribution(out).satisfies(single())
-            && TraitUtils.distribution(in).satisfies(random());
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> 
inputs) {
-        return new IgniteAggregate(cluster, getTraitSet(), sole(inputs),
-            getGroupSet(), getGroupSets(), getAggCallList());
-    }
+    public RelOptCost computeSelfCostSort(RelOptPlanner planner, 
RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = 
(IgniteCostFactory)planner.getCostFactory();
 
-    /** {@inheritDoc} */
-    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
         double rows = mq.getRowCount(getInput());
 
-        // TODO: fix it when 
https://issues.apache.org/jira/browse/IGNITE-13543 will be resolved
-        // currently it's OK to have such a dummy cost because there is no 
other options
-        return planner.getCostFactory().makeCost(rows, rows * 
IgniteCost.ROW_PASS_THROUGH_COST, 0);
+        double groupsCnt = estimateRowCount(mq);
+
+        return costFactory.makeCost(
+            groupsCnt,

Review comment:
       ```suggestion
               rows,
   ```

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
##########
@@ -17,258 +17,112 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.mapping.Mappings;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.NotNull;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 
-import static org.apache.calcite.plan.RelOptRule.convert;
-import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.calcite.util.ImmutableIntList.range;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.broadcast;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.hash;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.random;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.single;
 import static 
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
 
 /**
  *
  */
-public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel 
{
+public abstract class IgniteAggregate extends Aggregate {
     /** {@inheritDoc} */
-    public IgniteAggregate(RelOptCluster cluster, RelTraitSet traitSet, 
RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, 
List<AggregateCall> aggCalls) {
+    protected IgniteAggregate(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls
+    ) {
         super(cluster, traitSet, ImmutableList.of(), input, groupSet, 
groupSets, aggCalls);
     }
 
     /** {@inheritDoc} */
-    public IgniteAggregate(RelInput input) {
+    protected IgniteAggregate(RelInput input) {
         super(changeTraits(input, IgniteConvention.INSTANCE));
     }
 
-    /** {@inheritDoc} */
-    @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, 
ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> 
aggCalls) {
-        return new IgniteAggregate(getCluster(), traitSet, input, groupSet, 
groupSets, aggCalls);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
-        return visitor.visit(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Distribution propagation is based on next rules:
-        // 1) Any aggregation is possible on single or broadcast distribution.
-        // 2) hash-distributed aggregation is possible in case it's a simple 
aggregate having hash distributed input
-        //    and all of input distribution keys are parts of aggregation 
group and vice versa.
-        // 3) Map-reduce aggregation is possible in case it's a simple 
aggregate and its input has random distribution.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
-
-        RelDistribution.Type distrType = distribution.getType();
-
-        switch (distrType) {
-            case SINGLETON:
-            case BROADCAST_DISTRIBUTED:
-                return Pair.of(nodeTraits, 
ImmutableList.of(in.replace(distribution)));
-
-            case RANDOM_DISTRIBUTED:
-                if (!groupSet.isEmpty() && isSimple(this)) {
-                    IgniteDistribution outDistr = hash(range(0, 
groupSet.cardinality()));
-                    IgniteDistribution inDistr = hash(groupSet.asList());
-
-                    return Pair.of(nodeTraits.replace(outDistr), 
ImmutableList.of(in.replace(inDistr)));
-                }
-
-                break;
-
-            case HASH_DISTRIBUTED:
-                ImmutableIntList keys = distribution.getKeys();
-
-                if (isSimple(this) && groupSet.cardinality() == keys.size()) {
-                    Mappings.TargetMapping mapping = Commons.inverseMapping(
-                        groupSet, getInput().getRowType().getFieldCount());
-
-                    List<Integer> srcKeys = new ArrayList<>(keys.size());
-
-                    for (int key : keys) {
-                        int src = mapping.getSourceOpt(key);
-
-                        if (src == -1)
-                            break;
-
-                        srcKeys.add(src);
-                    }
-
-                    if (srcKeys.size() == keys.size())
-                        return Pair.of(nodeTraits, 
ImmutableList.of(in.replace(hash(srcKeys, distribution.function()))));
-                }
-
-                break;
-
-            default:
-                break;
-        }
-
-        return Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(single())));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Since it's a hash aggregate it erases collation.
-        return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Aggregate is rewindable if its input is rewindable.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        RewindabilityTrait rewindability = isMapReduce(nodeTraits, in)
-            ? RewindabilityTrait.ONE_WAY
-            : TraitUtils.rewindability(in);
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(rewindability), 
ImmutableList.of(in.replace(rewindability))));
+    /** */
+    @Override public double estimateRowCount(RelMetadataQuery mq) {
+        if (groupSet.cardinality() == 0)
+            return 1;
+
+        Double groupsCnt = mq.getDistinctRowCount(getInput(), groupSet, null);
+
+        // Estimation of the groups count is not available.
+        // Use heuristic estimation for result rows count.
+        if (groupsCnt == null)
+            return super.estimateRowCount(mq);
+        else
+            return groupsCnt;
     }
 
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Distribution propagation is based on next rules:
-        // 1) Any aggregation is possible on single or broadcast distribution.
-        // 2) hash-distributed aggregation is possible in case it's a simple 
aggregate having hash distributed input
-        //    and all of input distribution keys are parts aggregation group.
-        // 3) Map-reduce aggregation is possible in case it's a simple 
aggregate and its input has random distribution.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        List<Pair<RelTraitSet, List<RelTraitSet>>> res = new ArrayList<>();
-
-        IgniteDistribution distribution = TraitUtils.distribution(in);
-
-        RelDistribution.Type distrType = distribution.getType();
-
-        switch (distrType) {
-            case SINGLETON:
-            case BROADCAST_DISTRIBUTED:
-                res.add(Pair.of(nodeTraits.replace(distribution), 
ImmutableList.of(in)));
-
-                break;
-
-            case HASH_DISTRIBUTED:
-                if (isSimple(this)) {
-                    ImmutableIntList keys = distribution.getKeys();
-
-                    if (groupSet.cardinality() == keys.size()) {
-                        Mappings.TargetMapping mapping = 
Commons.inverseMapping(
-                            groupSet, getInput().getRowType().getFieldCount());
-
-                        IgniteDistribution outDistr = 
distribution.apply(mapping);
-
-                        if (outDistr.getType() == HASH_DISTRIBUTED)
-                            res.add(Pair.of(nodeTraits.replace(outDistr), 
ImmutableList.of(in)));
-                    }
-                }
-
-                break;
-
-            case RANDOM_DISTRIBUTED:
-                // Map-reduce aggregates
-                if (isSimple(this)) {
-                    res.add(Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(random()))));
-                    res.add(Pair.of(nodeTraits.replace(broadcast()), 
ImmutableList.of(in.replace(random()))));
-                }
-
-                break;
-
-            default:
-                break;
+    /** */
+    public double estimateMemoryForGroup(RelMetadataQuery mq) {
+        if (aggCalls.isEmpty())
+            return groupSet.cardinality() * IgniteCost.AVERAGE_FIELD_SIZE;

Review comment:
       rowSize should be used in case groupSet.cardinality() is empty

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapAggregateBase.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
+/**
+ *
+ */
+public abstract class IgniteMapAggregateBase extends IgniteAggregate 
implements IgniteRel {

Review comment:
       what is the purpose of this class?

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortAggregateNode.java
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Supplier;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class SortAggregateNode<Row> extends AbstractNode<Row> implements 
SingleNode<Row>, Downstream<Row> {
+    /** */
+    private final AggregateType type;
+
+    /** */
+    private final Supplier<List<AccumulatorWrapper<Row>>> accFactory;
+
+    /** */
+    private final RowFactory<Row> rowFactory;
+
+    /** */
+    private final ImmutableBitSet grpSet;
+
+    /** */
+    private final Comparator<Row> comp;
+
+    /** */
+    private Row prevRow;
+
+    /** */
+    private Group grp;
+
+    /** */
+    private int requested;
+
+    /** */
+    private int waiting;
+
+    /** */
+    private int cmpRes;
+
+    /**
+     * @param ctx Execution context.
+     */
+    public SortAggregateNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        AggregateType type,
+        ImmutableBitSet grpSet,
+        Supplier<List<AccumulatorWrapper<Row>>> accFactory,
+        RowFactory<Row> rowFactory,
+        Comparator<Row> comp
+    ) {
+        super(ctx, rowType);
+
+        this.type = type;
+        this.accFactory = accFactory;
+        this.rowFactory = rowFactory;
+        this.grpSet = grpSet;

Review comment:
       let's verify that group set is not empty

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortAggregate.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import java.util.Objects;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+
+/**
+ *
+ */
+public class IgniteSortAggregate extends IgniteAggregateBase {
+    /** Collation. */
+    private final RelCollation collation;
+
+    /** {@inheritDoc} */
+    public IgniteSortAggregate(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls
+    ) {
+        super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+
+        assert !TraitUtils.collation(traitSet).isDefault();
+
+        collation = TraitUtils.collation(traitSet);
+    }
+
+    /** {@inheritDoc} */
+    public IgniteSortAggregate(RelInput input) {
+        super(input);
+
+        collation = input.getCollation();
+
+        assert Objects.nonNull(collation);
+        assert !collation.isDefault();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, 
ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> 
aggCalls) {
+        return new IgniteSortAggregate(getCluster(), 
traitSet.replace(collation), input, groupSet, groupSets, aggCalls);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> 
inputs) {
+        return new IgniteSortAggregate(cluster, getTraitSet(), sole(inputs),
+            getGroupSet(), getGroupSets(), getAggCallList());
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        return super.explainTerms(pw).item("collation", collation);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+        RelCollation collation = 
RelCollations.of(ImmutableIntList.copyOf(groupSet.asList()));
+
+        return Pair.of(nodeTraits.replace(collation),
+            ImmutableList.of(inputTraits.get(0).replace(collation)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+        RelCollation requiredCollation = 
RelCollations.of(ImmutableIntList.copyOf(groupSet.asList()));

Review comment:
       actually, any permutation of group columns is possible here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to