TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa)

Closes #136


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0dfa3972
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0dfa3972
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0dfa3972

Branch: refs/heads/block_iteration
Commit: 0dfa3972c6a52d785b8e55f91d0906456a3926b3
Parents: de28c82
Author: Jaehwa Jung <[email protected]>
Authored: Wed Oct 8 11:35:45 2014 +0900
Committer: Jaehwa Jung <[email protected]>
Committed: Wed Oct 8 11:35:45 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../main/java/org/apache/tajo/SessionVars.java  |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   2 +
 .../eval/AggregationFunctionCallEval.java       |  38 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  56 ++-
 .../tajo/engine/planner/enforce/Enforcer.java   |  13 +
 .../engine/planner/global/GlobalPlanner.java    |  58 +--
 .../global/builder/DistinctGroupbyBuilder.java  | 329 ++++++++++++-
 .../planner/logical/DistinctGroupbyNode.java    |  52 +-
 .../DistinctGroupbyFirstAggregationExec.java    | 476 +++++++++++++++++++
 .../DistinctGroupbySecondAggregationExec.java   | 295 ++++++++++++
 .../DistinctGroupbyThirdAggregationExec.java    | 304 ++++++++++++
 .../tajo/master/querymaster/Repartitioner.java  |  42 +-
 .../tajo/master/querymaster/SubQuery.java       |  28 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |   8 +
 .../tajo/engine/query/TestGroupByQuery.java     |  60 ++-
 .../testDistinctAggregation8.sql                |   9 +
 .../testDistinctAggregation_case10.sql          |   5 +
 .../testDistinctAggregation_case9.sql           |  11 +
 .../testDistinctAggregation8.result             |   7 +
 .../testDistinctAggregation_case10.result       |   3 +
 .../testDistinctAggregation_case9.result        |   6 +
 .../TestTajoCli/testHelpSessionVars.result      |   1 +
 .../apache/tajo/storage/TupleComparator.java    |   8 +-
 24 files changed, 1735 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index bff9583..7aa7a0c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,6 +31,8 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun Kim and 
jaehwa)
+
     TAJO-1093: DateTimeFormat.to_char() is slower than 
SimpleDateFormat.format().
     (Jihun Kang via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java 
b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index cc875b2..1229849 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -98,6 +98,8 @@ public enum SessionVars implements ConfigKey {
   TABLE_PARTITION_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME,
       "shuffle output size for partition table write (mb)", DEFAULT),
 
+  GROUPBY_MULTI_LEVEL_ENABLED(ConfVars.$GROUPBY_MULTI_LEVEL_ENABLED, "Multiple 
level groupby enabled", DEFAULT),
+
   // for physical Executors
   EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort 
buffer size for external sort (mb)", DEFAULT),
   HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD, "limited 
size for hash join (mb)", DEFAULT),

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java 
b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index f9f5e4a..66d3030 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -316,6 +316,8 @@ public class TajoConf extends Configuration {
     
$DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb",
 256),
     
$DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb",
 256),
 
+    $GROUPBY_MULTI_LEVEL_ENABLED("tajo.dist-query.groupby.multi-level-aggr", 
true),
+
     // for physical Executors
     
$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 
200L),
     
$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes",

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
index ab18aa9..3216519 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
@@ -30,7 +30,10 @@ import org.apache.tajo.storage.VTuple;
 
 public class AggregationFunctionCallEval extends FunctionEval implements 
Cloneable {
   @Expose protected AggFunction instance;
-  @Expose boolean firstPhase = false;
+  @Expose boolean intermediatePhase = false;
+  @Expose boolean finalPhase = true;
+  @Expose String alias;
+
   private Tuple params;
 
   protected AggregationFunctionCallEval(EvalType type, FunctionDesc desc, 
AggFunction instance, EvalNode[] givenArgs) {
@@ -58,7 +61,8 @@ public class AggregationFunctionCallEval extends FunctionEval 
implements Cloneab
       }
     }
 
-    if (firstPhase) {
+    if (!intermediatePhase && !finalPhase) {
+      // firstPhase
       instance.eval(context, params);
     } else {
       instance.merge(context, params);
@@ -71,7 +75,7 @@ public class AggregationFunctionCallEval extends FunctionEval 
implements Cloneab
   }
 
   public Datum terminate(FunctionContext context) {
-    if (firstPhase) {
+    if (!finalPhase) {
       return instance.getPartialResult(context);
     } else {
       return instance.terminate(context);
@@ -80,18 +84,40 @@ public class AggregationFunctionCallEval extends 
FunctionEval implements Cloneab
 
   @Override
   public DataType getValueType() {
-    if (firstPhase) {
+    if (!finalPhase) {
       return instance.getPartialResultType();
     } else {
       return funcDesc.getReturnType();
     }
   }
 
+  public void setAlias(String alias) { this.alias = alias; }
+
+  public String getAlias() { return  this.alias; }
+
   public Object clone() throws CloneNotSupportedException {
-    return super.clone();
+    AggregationFunctionCallEval clone = 
(AggregationFunctionCallEval)super.clone();
+
+    clone.finalPhase = finalPhase;
+    clone.intermediatePhase = intermediatePhase;
+    clone.alias = alias;
+    clone.instance = (AggFunction)instance.clone();
+
+    return clone;
   }
 
   public void setFirstPhase() {
-    this.firstPhase = true;
+    this.finalPhase = false;
+    this.intermediatePhase = false;
+  }
+
+  public void setFinalPhase() {
+    this.finalPhase = true;
+    this.intermediatePhase = false;
+  }
+
+  public void setIntermediatePhase() {
+    this.finalPhase = false;
+    this.intermediatePhase = true;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 2730202..6b1c65c 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -43,6 +43,7 @@ import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer;
 import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.StorageConstants;
@@ -56,6 +57,7 @@ import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Stack;
 
@@ -1047,17 +1049,61 @@ public class PhysicalPlannerImpl implements 
PhysicalPlanner {
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, 
distinctNode);
     if (property != null) {
-      DistinctAggregationAlgorithm algorithm = 
property.getDistinct().getAlgorithm();
-      if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) {
-        return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
+      if (property.getDistinct().getIsMultipleAggregation()) {
+        MultipleAggregationStage stage = 
property.getDistinct().getMultipleAggregationStage();
+
+        if (stage == MultipleAggregationStage.FIRST_STAGE) {
+          return new DistinctGroupbyFirstAggregationExec(context, 
distinctNode, subOp);
+        } else if (stage == MultipleAggregationStage.SECOND_STAGE) {
+          return new DistinctGroupbySecondAggregationExec(context, 
distinctNode,
+              createSortExecForDistinctGroupby(context, distinctNode, subOp, 
2));
+        } else {
+          return new DistinctGroupbyThirdAggregationExec(context, distinctNode,
+              createSortExecForDistinctGroupby(context, distinctNode, subOp, 
3));
+        }
       } else {
-        return createSortAggregationDistinctGroupbyExec(context, distinctNode, 
subOp, property.getDistinct());
+        DistinctAggregationAlgorithm algorithm = 
property.getDistinct().getAlgorithm();
+        if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) {
+          return createInMemoryDistinctGroupbyExec(context, distinctNode, 
subOp);
+        } else {
+          return createSortAggregationDistinctGroupbyExec(context, 
distinctNode, subOp, property.getDistinct());
+        }
       }
     } else {
       return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
     }
   }
 
+  private SortExec createSortExecForDistinctGroupby(TaskAttemptContext context,
+                                                    DistinctGroupbyNode 
distinctNode,
+                                                    PhysicalExec subOp,
+                                                    int phase) throws 
IOException {
+    SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+    //2 phase: seq, groupby columns, distinct1 keys, distinct2 keys,
+    //3 phase: groupby columns, seq, distinct1 keys, distinct2 keys,
+    List<SortSpec> sortSpecs = new ArrayList<SortSpec>();
+    if (phase == 2) {
+      sortSpecs.add(new 
SortSpec(distinctNode.getTargets()[0].getNamedColumn()));
+    }
+    for (Column eachColumn: distinctNode.getGroupingColumns()) {
+      sortSpecs.add(new SortSpec(eachColumn));
+    }
+    if (phase == 3) {
+      sortSpecs.add(new 
SortSpec(distinctNode.getTargets()[0].getNamedColumn()));
+    }
+    for (GroupbyNode eachGroupbyNode: distinctNode.getGroupByNodes()) {
+      for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) {
+        sortSpecs.add(new SortSpec(eachColumn));
+      }
+    }
+    sortNode.setSortSpecs(sortSpecs.toArray(new SortSpec[]{}));
+    sortNode.setInSchema(distinctNode.getInSchema());
+    sortNode.setOutSchema(distinctNode.getInSchema());
+    ExternalSortExec sortExec = new ExternalSortExec(context, sm, sortNode, 
subOp);
+
+    return sortExec;
+  }
+
   private PhysicalExec createInMemoryDistinctGroupbyExec(TaskAttemptContext 
ctx,
       DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp) throws 
IOException {
     return new DistinctGroupbyHashAggregationExec(ctx, distinctGroupbyNode, 
subOp);
@@ -1145,7 +1191,7 @@ public class PhysicalPlannerImpl implements 
PhysicalPlanner {
 
   }
 
-  private EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, 
LogicalNode node) {
+  public static EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, 
LogicalNode node) {
     if (enforcer == null) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
index 031569e..e2d7744 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -24,6 +24,7 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.ProtoObject;
 import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.util.TUtil;
 
@@ -135,13 +136,25 @@ public class Enforcer implements 
ProtoObject<EnforcerProto> {
   public void enforceDistinctAggregation(int pid,
                                          DistinctAggregationAlgorithm 
algorithm,
                                          List<SortSpecArray> sortSpecArrays) {
+    enforceDistinctAggregation(pid, false, null, algorithm, sortSpecArrays);
+  }
+
+  public void enforceDistinctAggregation(int pid,
+                                         boolean isMultipleAggregation,
+                                         MultipleAggregationStage stage,
+                                         DistinctAggregationAlgorithm 
algorithm,
+                                         List<SortSpecArray> sortSpecArrays) {
     EnforceProperty.Builder builder = newProperty();
     DistinctGroupbyEnforcer.Builder enforce = 
DistinctGroupbyEnforcer.newBuilder();
     enforce.setPid(pid);
+    enforce.setIsMultipleAggregation(isMultipleAggregation);
     enforce.setAlgorithm(algorithm);
     if (sortSpecArrays != null) {
       enforce.addAllSortSpecArrays(sortSpecArrays);
     }
+    if (stage != null) {
+      enforce.setMultipleAggregationStage(stage);
+    }
 
     builder.setType(EnforceType.DISTINCT_GROUP_BY);
     builder.setDistinct(enforce.build());

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 432589b..01e02d7 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -658,47 +658,6 @@ public class GlobalPlanner {
     return rewritten;
   }
 
-  public ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan 
masterPlan, ExecutionBlock lastBlock,
-                                                  DistinctGroupbyNode 
firstPhaseGroupBy,
-                                                  DistinctGroupbyNode 
secondPhaseGroupBy) {
-    DataChannel lastDataChannel = null;
-
-    // It pushes down the first phase group-by operator into all child blocks.
-    //
-    // (second phase)    G (currentBlock)
-    //                  /|\
-    //                / / | \
-    // (first phase) G G  G  G (child block)
-
-    // They are already connected one another.
-    // So, we don't need to connect them again.
-    for (DataChannel dataChannel : 
masterPlan.getIncomingChannels(lastBlock.getId())) {
-      if (firstPhaseGroupBy.isEmptyGrouping()) {
-        dataChannel.setShuffle(HASH_SHUFFLE, 
firstPhaseGroupBy.getGroupingColumns(), 1);
-      } else {
-        dataChannel.setShuffle(HASH_SHUFFLE, 
firstPhaseGroupBy.getGroupingColumns(), 32);
-      }
-      dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
-      ExecutionBlock childBlock = 
masterPlan.getExecBlock(dataChannel.getSrcId());
-
-      // Why must firstPhaseGroupby be copied?
-      //
-      // A groupby in each execution block can have different child.
-      // It affects groupby's input schema.
-      DistinctGroupbyNode firstPhaseGroupbyCopy = 
PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy);
-      firstPhaseGroupbyCopy.setChild(childBlock.getPlan());
-      childBlock.setPlan(firstPhaseGroupbyCopy);
-
-      // just keep the last data channel.
-      lastDataChannel = dataChannel;
-    }
-
-    ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), 
lastDataChannel);
-    secondPhaseGroupBy.setChild(scanNode);
-    lastBlock.setPlan(secondPhaseGroupBy);
-    return lastBlock;
-  }
-
   /**
    * If there are at least one distinct aggregation function, a query works as 
if the query is rewritten as follows:
    *
@@ -824,8 +783,20 @@ public class GlobalPlanner {
     ExecutionBlock currentBlock;
 
     if (groupbyNode.isDistinct()) { // if there is at one distinct aggregation 
function
-      DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
-      return builder.buildPlan(context, lastBlock, groupbyNode);
+      boolean multiLevelEnabled = 
context.getPlan().getContext().getBool(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED);
+
+      if (multiLevelEnabled) {
+        if (PlannerUtil.findTopNode(groupbyNode, NodeType.UNION) == null) {
+          DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
+          return builder.buildMultiLevelPlan(context, lastBlock, groupbyNode);
+        } else {
+          DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
+          return builder.buildPlan(context, lastBlock, groupbyNode);
+        }
+      } else {
+        DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
+        return builder.buildPlan(context, lastBlock, groupbyNode);
+      }
     } else {
       GroupbyNode firstPhaseGroupby = 
createFirstPhaseGroupBy(masterPlan.getLogicalPlan(), groupbyNode);
 
@@ -968,6 +939,7 @@ public class GlobalPlanner {
         firstPhaseEvals[i].setFirstPhase();
         firstPhaseEvalNames[i] = 
plan.generateUniqueColumnName(firstPhaseEvals[i]);
         FieldEval param = new FieldEval(firstPhaseEvalNames[i], 
firstPhaseEvals[i].getValueType());
+        secondPhaseEvals[i].setFinalPhase();
         secondPhaseEvals[i].setArgs(new EvalNode[] {param});
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
index 8727b84..cbe2d7e 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
+import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
@@ -36,11 +37,14 @@ import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.GlobalPlanner.GlobalPlanContext;
+import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
 import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
 import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.util.TUtil;
 
@@ -56,6 +60,255 @@ public class DistinctGroupbyBuilder {
     this.globalPlanner = globalPlanner;
   }
 
+  public ExecutionBlock buildMultiLevelPlan(GlobalPlanContext context,
+                                            ExecutionBlock latestExecBlock,
+                                            LogicalNode currentNode) throws 
PlanningException {
+    try {
+      GroupbyNode groupbyNode = (GroupbyNode) currentNode;
+
+      LogicalPlan plan = context.getPlan().getLogicalPlan();
+
+      DistinctGroupbyNode baseDistinctNode =
+          buildMultiLevelBaseDistinctGroupByNode(context, latestExecBlock, 
groupbyNode);
+      baseDistinctNode.setGroupbyPlan(groupbyNode);
+
+      // Set total Aggregation Functions.
+      AggregationFunctionCallEval[] aggFunctions =
+          new 
AggregationFunctionCallEval[groupbyNode.getAggFunctions().length];
+
+      for (int i = 0; i < aggFunctions.length; i++) {
+        aggFunctions[i] = (AggregationFunctionCallEval) 
groupbyNode.getAggFunctions()[i].clone();
+        aggFunctions[i].setFirstPhase();
+        // If there is not grouping column, we can't find column alias.
+        // Thus we should find the alias at Groupbynode output schema.
+        if (groupbyNode.getGroupingColumns().length == 0
+            && aggFunctions.length == 
groupbyNode.getOutSchema().getColumns().size()) {
+          
aggFunctions[i].setAlias(groupbyNode.getOutSchema().getColumn(i).getQualifiedName());
+        }
+      }
+
+      if (groupbyNode.getGroupingColumns().length == 0
+          && aggFunctions.length == 
groupbyNode.getOutSchema().getColumns().size()) {
+        groupbyNode.setAggFunctions(aggFunctions);
+      }
+
+      baseDistinctNode.setAggFunctions(aggFunctions);
+
+      // Create First, SecondStage's Node using baseNode
+      DistinctGroupbyNode firstStageDistinctNode = PlannerUtil.clone(plan, 
baseDistinctNode);
+      DistinctGroupbyNode secondStageDistinctNode = PlannerUtil.clone(plan, 
baseDistinctNode);
+      DistinctGroupbyNode thirdStageDistinctNode = PlannerUtil.clone(plan, 
baseDistinctNode);
+
+      // Set second, third non-distinct aggregation's eval node to field eval
+      GroupbyNode lastGroupbyNode = 
secondStageDistinctNode.getGroupByNodes().get(secondStageDistinctNode.getGroupByNodes().size()
 - 1);
+      if (!lastGroupbyNode.isDistinct()) {
+        int index = 0;
+        for (AggregationFunctionCallEval aggrFunction: 
lastGroupbyNode.getAggFunctions()) {
+          aggrFunction.setIntermediatePhase();
+          aggrFunction.setArgs(new EvalNode[]{new 
FieldEval(lastGroupbyNode.getTargets()[index].getNamedColumn())});
+          index++;
+        }
+      }
+      lastGroupbyNode = 
thirdStageDistinctNode.getGroupByNodes().get(thirdStageDistinctNode.getGroupByNodes().size()
 - 1);
+      if (!lastGroupbyNode.isDistinct()) {
+        int index = 0;
+        for (AggregationFunctionCallEval aggrFunction: 
lastGroupbyNode.getAggFunctions()) {
+          aggrFunction.setFirstPhase();
+          aggrFunction.setArgs(new EvalNode[]{new 
FieldEval(lastGroupbyNode.getTargets()[index].getNamedColumn())});
+          index++;
+        }
+      }
+
+      // Set in & out schema for each DistinctGroupbyNode.
+      
secondStageDistinctNode.setInSchema(firstStageDistinctNode.getOutSchema());
+      
secondStageDistinctNode.setOutSchema(firstStageDistinctNode.getOutSchema());
+      
thirdStageDistinctNode.setInSchema(firstStageDistinctNode.getOutSchema());
+      thirdStageDistinctNode.setOutSchema(groupbyNode.getOutSchema());
+
+      // Set latestExecBlock's plan with firstDistinctNode
+      latestExecBlock.setPlan(firstStageDistinctNode);
+
+      // Make SecondStage ExecutionBlock
+      ExecutionBlock secondStageBlock = context.getPlan().newExecutionBlock();
+
+      // Make ThirdStage ExecutionBlock
+      ExecutionBlock thirdStageBlock = context.getPlan().newExecutionBlock();
+
+      // Set Enforcer
+      setMultiStageAggregationEnforcer(latestExecBlock, 
firstStageDistinctNode, secondStageBlock,
+          secondStageDistinctNode, thirdStageBlock, thirdStageDistinctNode);
+
+      //Create data channel FirstStage to SecondStage
+      DataChannel firstChannel = new DataChannel(latestExecBlock, 
secondStageBlock, HASH_SHUFFLE, 32);
+
+      
firstChannel.setShuffleKeys(firstStageDistinctNode.getFirstStageShuffleKeyColumns());
+      firstChannel.setSchema(firstStageDistinctNode.getOutSchema());
+      firstChannel.setStoreType(globalPlanner.getStoreType());
+
+      ScanNode scanNode = 
GlobalPlanner.buildInputExecutor(context.getPlan().getLogicalPlan(), 
firstChannel);
+      secondStageDistinctNode.setChild(scanNode);
+
+      secondStageBlock.setPlan(secondStageDistinctNode);
+
+      context.getPlan().addConnect(firstChannel);
+
+      DataChannel secondChannel;
+      //Create data channel SecondStage to ThirdStage
+      if (groupbyNode.isEmptyGrouping()) {
+        secondChannel = new DataChannel(secondStageBlock, thirdStageBlock, 
HASH_SHUFFLE, 1);
+        
secondChannel.setShuffleKeys(firstStageDistinctNode.getGroupingColumns());
+      } else {
+        secondChannel = new DataChannel(secondStageBlock, thirdStageBlock, 
HASH_SHUFFLE, 32);
+        
secondChannel.setShuffleKeys(firstStageDistinctNode.getGroupingColumns());
+      }
+      secondChannel.setSchema(secondStageDistinctNode.getOutSchema());
+      secondChannel.setStoreType(globalPlanner.getStoreType());
+
+      scanNode = 
GlobalPlanner.buildInputExecutor(context.getPlan().getLogicalPlan(), 
secondChannel);
+      thirdStageDistinctNode.setChild(scanNode);
+
+      thirdStageBlock.setPlan(thirdStageDistinctNode);
+
+      context.getPlan().addConnect(secondChannel);
+
+      if (GlobalPlanner.hasUnionChild(firstStageDistinctNode)) {
+        buildDistinctGroupbyAndUnionPlan(
+            context.getPlan(), latestExecBlock, firstStageDistinctNode, 
firstStageDistinctNode);
+      }
+
+      return thirdStageBlock;
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      throw new PlanningException(e);
+    }
+  }
+
+  private DistinctGroupbyNode 
buildMultiLevelBaseDistinctGroupByNode(GlobalPlanContext context,
+                                                                     
ExecutionBlock latestExecBlock,
+                                                                     
GroupbyNode groupbyNode) {
+    LogicalPlan plan = context.getPlan().getLogicalPlan();
+
+    /*
+     Making DistinctGroupbyNode from GroupByNode
+     select col1, count(distinct col2), count(distinct col3), sum(col4) from 
... group by col1
+     => DistinctGroupbyNode
+        Distinct Seq
+        grouping key = col1
+        Sub GroupbyNodes
+         - GroupByNode1: grouping(col2), expr(count distinct col2)
+         - GroupByNode2: grouping(col3), expr(count distinct col3)
+         - GroupByNode3: expr(sum col4)
+    */
+    List<Column> originalGroupingColumns = 
Arrays.asList(groupbyNode.getGroupingColumns());
+
+    List<GroupbyNode> childGroupbyNodes = new ArrayList<GroupbyNode>();
+
+    List<AggregationFunctionCallEval> otherAggregationFunctionCallEvals = new 
ArrayList<AggregationFunctionCallEval>();
+    List<Target> otherAggregationFunctionTargets = new ArrayList<Target>();
+
+    //distinct columns -> GroupbyNode
+    Map<String, DistinctGroupbyNodeBuildInfo> distinctNodeBuildInfos = new 
HashMap<String, DistinctGroupbyNodeBuildInfo>();
+    AggregationFunctionCallEval[] aggFunctions = groupbyNode.getAggFunctions();
+    for (int aggIdx = 0; aggIdx < aggFunctions.length; aggIdx++) {
+      AggregationFunctionCallEval aggFunction = aggFunctions[aggIdx];
+      aggFunction.setFirstPhase();
+      Target originAggFunctionTarget = 
groupbyNode.getTargets()[originalGroupingColumns.size() + aggIdx];
+      Target aggFunctionTarget =
+          new Target(new 
FieldEval(originAggFunctionTarget.getEvalTree().getName(), 
aggFunction.getValueType()));
+
+      if (aggFunction.isDistinct()) {
+        // Create or reuse Groupby node for each Distinct expression.
+        LinkedHashSet<Column> groupbyUniqColumns = 
EvalTreeUtil.findUniqueColumns(aggFunction);
+        String groupbyMapKey = EvalTreeUtil.columnsToStr(groupbyUniqColumns);
+        DistinctGroupbyNodeBuildInfo buildInfo = 
distinctNodeBuildInfos.get(groupbyMapKey);
+        if (buildInfo == null) {
+          GroupbyNode distinctGroupbyNode = new 
GroupbyNode(context.getPlan().getLogicalPlan().newPID());
+          buildInfo = new DistinctGroupbyNodeBuildInfo(distinctGroupbyNode);
+          distinctNodeBuildInfos.put(groupbyMapKey, buildInfo);
+
+          // Grouping columns are GROUP BY clause's column + Distinct column.
+          List<Column> groupingColumns = new ArrayList<Column>();
+          for (Column eachGroupingColumn: groupbyUniqColumns) {
+            if (!groupingColumns.contains(eachGroupingColumn)) {
+              groupingColumns.add(eachGroupingColumn);
+            }
+          }
+          distinctGroupbyNode.setGroupingColumns(groupingColumns.toArray(new 
Column[]{}));
+        }
+        buildInfo.addAggFunction(aggFunction);
+        buildInfo.addAggFunctionTarget(aggFunctionTarget);
+      } else {
+        otherAggregationFunctionCallEvals.add(aggFunction);
+        otherAggregationFunctionTargets.add(aggFunctionTarget);
+      }
+    }
+
+    List<Target> baseGroupByTargets = new ArrayList<Target>();
+    baseGroupByTargets.add(new Target(new FieldEval(new Column("?distinctseq", 
Type.INT2))));
+    for (Column column : originalGroupingColumns) {
+      baseGroupByTargets.add(new Target(new FieldEval(column)));
+    }
+
+    //Add child groupby node for each Distinct clause
+    for (String eachKey: distinctNodeBuildInfos.keySet()) {
+      DistinctGroupbyNodeBuildInfo buildInfo = 
distinctNodeBuildInfos.get(eachKey);
+      GroupbyNode eachGroupbyNode = buildInfo.getGroupbyNode();
+      List<AggregationFunctionCallEval> groupbyAggFunctions = 
buildInfo.getAggFunctions();
+      String [] firstPhaseEvalNames = new String[groupbyAggFunctions.size()];
+      int index = 0;
+      for (AggregationFunctionCallEval eachCallEval: groupbyAggFunctions) {
+        firstPhaseEvalNames[index++] = eachCallEval.getName();
+      }
+
+      Target[] targets = new 
Target[eachGroupbyNode.getGroupingColumns().length + 
groupbyAggFunctions.size()];
+      int targetIdx = 0;
+
+      for (Column column : eachGroupbyNode.getGroupingColumns()) {
+        Target target = new Target(new FieldEval(column));
+        targets[targetIdx++] = target;
+        baseGroupByTargets.add(target);
+      }
+      for (Target eachAggFunctionTarget: buildInfo.getAggFunctionTargets()) {
+        targets[targetIdx++] = eachAggFunctionTarget;
+      }
+      eachGroupbyNode.setTargets(targets);
+      eachGroupbyNode.setAggFunctions(groupbyAggFunctions.toArray(new 
AggregationFunctionCallEval[]{}));
+      eachGroupbyNode.setDistinct(true);
+      eachGroupbyNode.setInSchema(groupbyNode.getInSchema());
+
+      childGroupbyNodes.add(eachGroupbyNode);
+    }
+
+    // Merge other aggregation function to a GroupBy Node.
+    if (!otherAggregationFunctionCallEvals.isEmpty()) {
+      // finally this aggregation output tuple's order is GROUP_BY_COL1, COL2, 
.... + AGG_VALUE, SUM_VALUE, ...
+      GroupbyNode otherGroupbyNode = new 
GroupbyNode(context.getPlan().getLogicalPlan().newPID());
+
+      Target[] targets = new Target[otherAggregationFunctionTargets.size()];
+      int targetIdx = 0;
+      for (Target eachTarget : otherAggregationFunctionTargets) {
+        targets[targetIdx++] = eachTarget;
+        baseGroupByTargets.add(eachTarget);
+      }
+
+      otherGroupbyNode.setTargets(targets);
+      otherGroupbyNode.setGroupingColumns(new Column[]{});
+      
otherGroupbyNode.setAggFunctions(otherAggregationFunctionCallEvals.toArray(new 
AggregationFunctionCallEval[]{}));
+      otherGroupbyNode.setInSchema(groupbyNode.getInSchema());
+
+      childGroupbyNodes.add(otherGroupbyNode);
+    }
+
+    DistinctGroupbyNode baseDistinctNode = new 
DistinctGroupbyNode(context.getPlan().getLogicalPlan().newPID());
+    baseDistinctNode.setTargets(baseGroupByTargets.toArray(new Target[]{}));
+    baseDistinctNode.setGroupColumns(groupbyNode.getGroupingColumns());
+    baseDistinctNode.setInSchema(groupbyNode.getInSchema());
+    baseDistinctNode.setChild(groupbyNode.getChild());
+
+    baseDistinctNode.setGroupbyNodes(childGroupbyNodes);
+
+    return baseDistinctNode;
+  }
 
   public ExecutionBlock buildPlan(GlobalPlanContext context,
                                   ExecutionBlock latestExecBlock,
@@ -66,7 +319,7 @@ public class DistinctGroupbyBuilder {
       DistinctGroupbyNode baseDistinctNode = 
buildBaseDistinctGroupByNode(context, latestExecBlock, groupbyNode);
 
       // Create First, SecondStage's Node using baseNode
-      DistinctGroupbyNode[] distinctNodes = createMultiPhaseDistinctNode(plan, 
groupbyNode, baseDistinctNode);
+      DistinctGroupbyNode[] distinctNodes = createTwoPhaseDistinctNode(plan, 
groupbyNode, baseDistinctNode);
 
       DistinctGroupbyNode firstStageDistinctNode = distinctNodes[0];
       DistinctGroupbyNode secondStageDistinctNode = distinctNodes[1];
@@ -100,7 +353,7 @@ public class DistinctGroupbyBuilder {
       context.getPlan().addConnect(channel);
 
       if (GlobalPlanner.hasUnionChild(firstStageDistinctNode)) {
-        globalPlanner.buildDistinctGroupbyAndUnionPlan(
+        buildDistinctGroupbyAndUnionPlan(
             context.getPlan(), latestExecBlock, firstStageDistinctNode, 
firstStageDistinctNode);
       }
 
@@ -162,6 +415,7 @@ public class DistinctGroupbyBuilder {
         buildInfo.addAggFunction(aggFunction);
         buildInfo.addAggFunctionTarget(aggFunctionTarget);
       } else {
+        aggFunction.setFinalPhase();
         otherAggregationFunctionCallEvals.add(aggFunction);
         otherAggregationFunctionTargets.add(aggFunctionTarget);
       }
@@ -224,7 +478,7 @@ public class DistinctGroupbyBuilder {
     return baseDistinctNode;
   }
 
-  public DistinctGroupbyNode[] createMultiPhaseDistinctNode(LogicalPlan plan,
+  public DistinctGroupbyNode[] createTwoPhaseDistinctNode(LogicalPlan plan,
                                                                    GroupbyNode 
originGroupbyNode,
                                                                    
DistinctGroupbyNode baseDistinctNode) {
     /*
@@ -456,6 +710,75 @@ public class DistinctGroupbyBuilder {
 
   }
 
+  private void setMultiStageAggregationEnforcer(
+      ExecutionBlock firstStageBlock, DistinctGroupbyNode 
firstStageDistinctNode,
+      ExecutionBlock secondStageBlock, DistinctGroupbyNode 
secondStageDistinctNode,
+      ExecutionBlock thirdStageBlock, DistinctGroupbyNode 
thirdStageDistinctNode) {
+    
firstStageBlock.getEnforcer().enforceDistinctAggregation(firstStageDistinctNode.getPID(),
+        true, MultipleAggregationStage.FIRST_STAGE,
+        DistinctAggregationAlgorithm.HASH_AGGREGATION, null);
+
+    
secondStageBlock.getEnforcer().enforceDistinctAggregation(secondStageDistinctNode.getPID(),
+        true, MultipleAggregationStage.SECOND_STAGE,
+        DistinctAggregationAlgorithm.HASH_AGGREGATION, null);
+
+    List<SortSpecArray> sortSpecArrays = new ArrayList<SortSpecArray>();
+    int index = 0;
+    for (GroupbyNode groupbyNode: firstStageDistinctNode.getGroupByNodes()) {
+      List<SortSpecProto> sortSpecs = new ArrayList<SortSpecProto>();
+      for (Column column: groupbyNode.getGroupingColumns()) {
+        
sortSpecs.add(SortSpecProto.newBuilder().setColumn(column.getProto()).build());
+      }
+      sortSpecArrays.add( SortSpecArray.newBuilder()
+          .setPid(thirdStageDistinctNode.getGroupByNodes().get(index).getPID())
+          .addAllSortSpecs(sortSpecs).build());
+    }
+    
thirdStageBlock.getEnforcer().enforceDistinctAggregation(thirdStageDistinctNode.getPID(),
+        true, MultipleAggregationStage.THRID_STAGE,
+        DistinctAggregationAlgorithm.SORT_AGGREGATION, sortSpecArrays);
+  }
+
+  private ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan 
masterPlan, ExecutionBlock lastBlock,
+                                                         DistinctGroupbyNode 
firstPhaseGroupBy,
+                                                         DistinctGroupbyNode 
secondPhaseGroupBy) {
+    DataChannel lastDataChannel = null;
+
+    // It pushes down the first phase group-by operator into all child blocks.
+    //
+    // (second phase)    G (currentBlock)
+    //                  /|\
+    //                / / | \
+    // (first phase) G G  G  G (child block)
+
+    // They are already connected one another.
+    // So, we don't need to connect them again.
+    for (DataChannel dataChannel : 
masterPlan.getIncomingChannels(lastBlock.getId())) {
+      if (firstPhaseGroupBy.isEmptyGrouping()) {
+        dataChannel.setShuffle(HASH_SHUFFLE, 
firstPhaseGroupBy.getGroupingColumns(), 1);
+      } else {
+        dataChannel.setShuffle(HASH_SHUFFLE, 
firstPhaseGroupBy.getGroupingColumns(), 32);
+      }
+      dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
+      ExecutionBlock childBlock = 
masterPlan.getExecBlock(dataChannel.getSrcId());
+
+      // Why must firstPhaseGroupby be copied?
+      //
+      // A groupby in each execution block can have different child.
+      // It affects groupby's input schema.
+      DistinctGroupbyNode firstPhaseGroupbyCopy = 
PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy);
+      firstPhaseGroupbyCopy.setChild(childBlock.getPlan());
+      childBlock.setPlan(firstPhaseGroupbyCopy);
+
+      // just keep the last data channel.
+      lastDataChannel = dataChannel;
+    }
+
+    ScanNode scanNode = 
GlobalPlanner.buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel);
+    secondPhaseGroupBy.setChild(scanNode);
+    lastBlock.setPlan(secondPhaseGroupBy);
+    return lastBlock;
+  }
+
   static class DistinctGroupbyNodeBuildInfo {
     private GroupbyNode groupbyNode;
     private List<AggregationFunctionCallEval> aggFunctions = new 
ArrayList<AggregationFunctionCallEval>();

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
index b1e4bc3..47e8933 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
@@ -27,10 +27,15 @@ import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.util.TUtil;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 public class DistinctGroupbyNode extends UnaryNode implements Projectable, 
Cloneable {
   @Expose
+  private GroupbyNode groupbyPlan;
+
+  @Expose
   private List<GroupbyNode> groupByNodes;
 
   @Expose
@@ -42,6 +47,9 @@ public class DistinctGroupbyNode extends UnaryNode implements 
Projectable, Clone
   @Expose
   private int[] resultColumnIds;
 
+  /** Aggregation Functions */
+  @Expose private AggregationFunctionCallEval [] aggrFunctions;
+
   public DistinctGroupbyNode(int pid) {
     super(pid, NodeType.DISTINCT_GROUP_BY);
   }
@@ -59,7 +67,11 @@ public class DistinctGroupbyNode extends UnaryNode 
implements Projectable, Clone
 
   @Override
   public Target[] getTargets() {
-    return new Target[0];
+    if (hasTargets()) {
+      return targets;
+    } else {
+      return new Target[0];
+    }
   }
 
   public void setGroupbyNodes(List<GroupbyNode> groupByNodes) {
@@ -86,6 +98,18 @@ public class DistinctGroupbyNode extends UnaryNode 
implements Projectable, Clone
     this.resultColumnIds = resultColumnIds;
   }
 
+  public AggregationFunctionCallEval [] getAggFunctions() {
+    return this.aggrFunctions;
+  }
+
+  public void setAggFunctions(AggregationFunctionCallEval[] evals) {
+    this.aggrFunctions = evals;
+  }
+
+  public void setGroupbyPlan(GroupbyNode groupbyPlan) { this.groupbyPlan = 
groupbyPlan; }
+
+  public GroupbyNode getGroupbyPlan() { return this.groupbyPlan; }
+
   @Override
   public Object clone() throws CloneNotSupportedException {
     DistinctGroupbyNode cloneNode = (DistinctGroupbyNode)super.clone();
@@ -113,6 +137,9 @@ public class DistinctGroupbyNode extends UnaryNode 
implements Projectable, Clone
       }
     }
 
+    if (groupbyPlan != null) {
+      cloneNode.groupbyPlan = (GroupbyNode)groupbyPlan.clone();
+    }
     return cloneNode;
   }
 
@@ -200,4 +227,27 @@ public class DistinctGroupbyNode extends UnaryNode 
implements Projectable, Clone
 
     return planStr;
   }
+
+  public Column[] getFirstStageShuffleKeyColumns() {
+    List<Column> shuffleKeyColumns = new ArrayList<Column>();
+    shuffleKeyColumns.add(getOutSchema().getColumn(0));   //distinctseq column
+    if (groupingColumns != null) {
+      for (Column eachColumn: groupingColumns) {
+        if (!shuffleKeyColumns.contains(eachColumn)) {
+          shuffleKeyColumns.add(eachColumn);
+        }
+      }
+    }
+    for (GroupbyNode eachGroupbyNode: groupByNodes) {
+      if (eachGroupbyNode.getGroupingColumns() != null && 
eachGroupbyNode.getGroupingColumns().length > 0) {
+        for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) {
+          if (!shuffleKeyColumns.contains(eachColumn)) {
+            shuffleKeyColumns.add(eachColumn);
+          }
+        }
+      }
+    }
+
+    return shuffleKeyColumns.toArray(new Column[]{});
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
new file mode 100644
index 0000000..7201ed4
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Int2Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+
+/**
+ * This class incremented each row to more rows by grouping columns. In 
addition, the operator must creates each row
+ * because of aggregation non-distinct columns.
+ *
+ * For example, there is a query as follows:
+ *  select sum(distinct l_orderkey), l_linenumber, l_returnflag, l_linestatus, 
l_shipdate,
+ *         count(distinct l_partkey), sum(l_orderkey)
+ *  from lineitem
+ *  group by l_linenumber, l_returnflag, l_linestatus, l_shipdate;
+ *
+ *  If you execute above query on tajo, FileScanner makes tuples after 
scanning raw data as follows:
+ *
+ *  
-----------------------------------------------------------------------------
+ *  l_linenumber, l_returnflag, l_linestatus, l_shipdate, l_orderkey, l_partkey
+ *  
-----------------------------------------------------------------------------
+ *  1, N, O, 1996-03-13, 1, 1
+ *  2, N, O, 1996-04-12, 1, 1
+ *  1, N, O, 1997-01-28, 2, 2
+ *  1, R, F, 1994-02-02, 3, 2
+ *  2, R, F, 1993-11-09, 3, 3
+ *  
+ *  And then the scanner will push it as input data to this class. After then, 
this class will makes output data as
+ *  follows:
+ *
+ *  
-------------------------------------------------------------------------------------------------------------------
+ *  NodeSequence, l_linenumber, l_returnflag, l_linestatus, l_shipdate, 
l_partkey for distinct,
+ *  l_orderkey for distinct, l_orderkey for nondistinct
+ *  
-------------------------------------------------------------------------------------------------------------------
+ *  0, 2, R, F, 1993-11-09, 3, NULL, 3
+ *  0, 2, N, O, 1996-04-12, 1, NULL, 1
+ *  0, 1, N, O, 1997-01-28, 2, NULL, 2
+ *  0, 1, R, F, 1994-02-02, 2, NULL, 3
+ *  0, 1, N, O, 1996-03-13, 1, NULL, 1
+ *  1, 2, R, F, 1993-11-09, NULL, 3, NULL
+ *  1, 2, N, O, 1996-04-12, NULL, 1, NULL
+ *  1, 1, N, O, 1997-01-28, NULL, 2, NULL
+ *  1, 1, R, F, 1994-02-02, NULL, 3, NULL
+ *  1, 1, N, O, 1996-03-13, NULL, 1, NULL
+ *
+ *  For reference, NodeSequence means GroupByNode sequence. In this case, 
there are two GroupByNode. And it consist
+ *  of lineitem.l_partkey and lineitem.l_orderkey. The NodeSequence of 
lineitem.l_partkey is zero and the sequence of
+ *  lineitem.l_orderkey is one. As above output data, If there are 
uncomfortable column for DistinctGroupBy, 
+ *  inner aggregator makes it to NullDataTum.
+ *  
+ *  In addition, columns for NonDistinctGroupBy only can contains real value 
at first NodeSequence.
+ *
+ */
+
+public class DistinctGroupbyFirstAggregationExec extends PhysicalExec {
+  private static Log LOG = 
LogFactory.getLog(DistinctGroupbyFirstAggregationExec.class);
+
+  private DistinctGroupbyNode plan;
+  private boolean finished = false;
+  private boolean preparedData = false;
+  private PhysicalExec child;
+
+  private long totalNumRows;
+  private int fetchedRows;
+  private float progress;
+
+  private int[] groupingKeyIndexes;
+  private NonDistinctHashAggregator nonDistinctHashAggregator;
+  private DistinctHashAggregator[] distinctAggregators;
+
+  private int resultTupleLength;
+
+  public DistinctGroupbyFirstAggregationExec(TaskAttemptContext context, 
DistinctGroupbyNode plan, PhysicalExec subOp)
+      throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema());
+    this.child = subOp;
+    this.plan = plan;
+  }
+
+  @Override
+  public void init() throws IOException {
+    super.init();
+    child.init();
+
+    // finding grouping column index
+    Column[] groupingColumns = plan.getGroupingColumns();
+    groupingKeyIndexes = new int[groupingColumns.length];
+
+    int index = 0;
+    for (Column col: groupingColumns) {
+      int keyIndex;
+      if (col.hasQualifier()) {
+        keyIndex = inSchema.getColumnId(col.getQualifiedName());
+      } else {
+        keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
+      }
+      groupingKeyIndexes[index++] = keyIndex;
+    }
+    resultTupleLength = groupingKeyIndexes.length + 1;  //1 is Sequence Datum 
which indicates sequence of DistinctNode.
+
+    List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+
+    List<DistinctHashAggregator> distinctAggrList = new 
ArrayList<DistinctHashAggregator>();
+    int distinctSeq = 0;
+    for (GroupbyNode eachGroupby: groupbyNodes) {
+      if (eachGroupby.isDistinct()) {
+        DistinctHashAggregator aggregator = new 
DistinctHashAggregator(eachGroupby);
+        aggregator.setNodeSequence(distinctSeq++);
+        distinctAggrList.add(aggregator);
+        resultTupleLength += aggregator.getTupleLength();
+      } else {
+        nonDistinctHashAggregator = new NonDistinctHashAggregator(eachGroupby);
+        resultTupleLength += nonDistinctHashAggregator.getTupleLength();
+      }
+    }
+    distinctAggregators = distinctAggrList.toArray(new 
DistinctHashAggregator[]{});
+  }
+
+  private int currentAggregatorIndex = 0;
+
+  @Override
+  public Tuple next() throws IOException {
+    if (!preparedData) {
+      prepareInputData();
+    }
+
+    int prevIndex = currentAggregatorIndex;
+    while (!context.isStopped()) {
+      DistinctHashAggregator aggregator = 
distinctAggregators[currentAggregatorIndex];
+      Tuple result = aggregator.next();
+      if (result != null) {
+        return result;
+      }
+      currentAggregatorIndex++;
+      currentAggregatorIndex = currentAggregatorIndex % 
distinctAggregators.length;
+      if (currentAggregatorIndex == prevIndex) {
+        finished = true;
+        return null;
+      }
+    }
+
+    return null;
+  }
+
+  private void prepareInputData() throws IOException {
+    Tuple tuple = null;
+    while(!context.isStopped() && (tuple = child.next()) != null) {
+      Tuple groupingKey = new VTuple(groupingKeyIndexes.length);
+      for (int i = 0; i < groupingKeyIndexes.length; i++) {
+        groupingKey.put(i, tuple.get(groupingKeyIndexes[i]));
+      }
+      for (int i = 0; i < distinctAggregators.length; i++) {
+        distinctAggregators[i].compute(groupingKey, tuple);
+      }
+      if (nonDistinctHashAggregator != null) {
+        nonDistinctHashAggregator.compute(groupingKey, tuple);
+      }
+    }
+    for (int i = 0; i < distinctAggregators.length; i++) {
+      distinctAggregators[i].rescan();
+    }
+
+    totalNumRows = distinctAggregators[0].distinctAggrDatas.size();
+    preparedData = true;
+  }
+
+  @Override
+  public void close() throws IOException {
+    child.close();
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    if (child != null) {
+      return child.getInputStats();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public float getProgress() {
+    if (finished) {
+      return progress;
+    } else {
+      if (totalNumRows > 0) {
+        return progress + ((float)fetchedRows / (float)totalNumRows) * 0.5f;
+      } else {
+        return progress;
+      }
+    }
+  }
+
+  @Override
+  public void rescan() {
+    finished = false;
+    currentAggregatorIndex = 0;
+    for (int i = 0; i < distinctAggregators.length; i++) {
+      distinctAggregators[i].rescan();
+    }
+  }
+
+  class NonDistinctHashAggregator {
+    private GroupbyNode groupbyNode;
+    private int aggFunctionsNum;
+    private final AggregationFunctionCallEval aggFunctions[];
+
+    // GroupingKey -> FunctionContext[]
+    private Map<Tuple, FunctionContext[]> nonDistinctAggrDatas;
+    private int tupleLength;
+
+    private Tuple dummyTuple;
+    private NonDistinctHashAggregator(GroupbyNode groupbyNode) throws 
IOException {
+      this.groupbyNode = groupbyNode;
+
+      nonDistinctAggrDatas = new HashMap<Tuple, FunctionContext[]>();
+
+      if (groupbyNode.hasAggFunctions()) {
+        aggFunctions = groupbyNode.getAggFunctions();
+        aggFunctionsNum = aggFunctions.length;
+        for (AggregationFunctionCallEval eachFunction: aggFunctions) {
+          eachFunction.setFirstPhase();
+        }
+      } else {
+        aggFunctions = new AggregationFunctionCallEval[0];
+        aggFunctionsNum = 0;
+      }
+
+      dummyTuple = new VTuple(aggFunctionsNum);
+      for (int i = 0; i < aggFunctionsNum; i++) {
+        dummyTuple.put(i, NullDatum.get());
+      }
+      tupleLength = aggFunctionsNum;
+    }
+
+    public void compute(Tuple groupingKeyTuple, Tuple tuple) {
+      FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKeyTuple);
+      if (contexts != null) {
+        for (int i = 0; i < aggFunctions.length; i++) {
+          aggFunctions[i].merge(contexts[i], inSchema, tuple);
+        }
+      } else { // if the key occurs firstly
+        contexts = new FunctionContext[aggFunctionsNum];
+        for (int i = 0; i < aggFunctionsNum; i++) {
+          contexts[i] = aggFunctions[i].newContext();
+          aggFunctions[i].merge(contexts[i], inSchema, tuple);
+        }
+        nonDistinctAggrDatas.put(groupingKeyTuple, contexts);
+      }
+    }
+
+    public Tuple aggregate(Tuple groupingKey) {
+      FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKey);
+      if (contexts == null) {
+        return null;
+      }
+      Tuple tuple = new VTuple(aggFunctionsNum);
+
+      for (int i = 0; i < aggFunctionsNum; i++) {
+        tuple.put(i, aggFunctions[i].terminate(contexts[i]));
+      }
+
+      return tuple;
+    }
+
+    public int getTupleLength() {
+      return tupleLength;
+    }
+
+    public Tuple getDummyTuple() {
+      return dummyTuple;
+    }
+  }
+
+  class DistinctHashAggregator {
+    private GroupbyNode groupbyNode;
+
+    // GroupingKey -> DistinctKey
+    private Map<Tuple, Set<Tuple>> distinctAggrDatas;
+    private Iterator<Entry<Tuple, Set<Tuple>>> iterator = null;
+
+    private int nodeSequence;
+    private Int2Datum nodeSequenceDatum;
+
+    private int[] distinctKeyIndexes;
+
+    private int tupleLength;
+    private Tuple dummyTuple;
+    private boolean aggregatorFinished = false;
+
+    public DistinctHashAggregator(GroupbyNode groupbyNode) throws IOException {
+      this.groupbyNode = groupbyNode;
+
+      Set<Integer> groupingKeyIndexSet = new HashSet<Integer>();
+      for (Integer eachIndex: groupingKeyIndexes) {
+        groupingKeyIndexSet.add(eachIndex);
+      }
+
+      List<Integer> distinctGroupingKeyIndexSet = new ArrayList<Integer>();
+      Column[] groupingColumns = groupbyNode.getGroupingColumns();
+      for (int idx = 0; idx < groupingColumns.length; idx++) {
+        Column col = groupingColumns[idx];
+        int keyIndex;
+        if (col.hasQualifier()) {
+          keyIndex = inSchema.getColumnId(col.getQualifiedName());
+        } else {
+          keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
+        }
+        if (!groupingKeyIndexSet.contains(keyIndex)) {
+          distinctGroupingKeyIndexSet.add(keyIndex);
+        }
+      }
+      int index = 0;
+      this.distinctKeyIndexes = new int[distinctGroupingKeyIndexSet.size()];
+      this.dummyTuple = new VTuple(distinctGroupingKeyIndexSet.size());
+      for (Integer eachId : distinctGroupingKeyIndexSet) {
+        this.dummyTuple.put(index, NullDatum.get());
+        this.distinctKeyIndexes[index++] = eachId;
+      }
+
+      this.distinctAggrDatas = new HashMap<Tuple, Set<Tuple>>();
+      this.tupleLength = distinctKeyIndexes.length;
+    }
+
+    public void setNodeSequence(int nodeSequence) {
+      this.nodeSequence = nodeSequence;
+      this.nodeSequenceDatum = new Int2Datum((short)nodeSequence);
+    }
+
+    public int getTupleLength() {
+      return tupleLength;
+    }
+
+    public void compute(Tuple groupingKey, Tuple tuple) throws IOException {
+      Tuple distinctKeyTuple = new VTuple(distinctKeyIndexes.length);
+      for (int i = 0; i < distinctKeyIndexes.length; i++) {
+        distinctKeyTuple.put(i, tuple.get(distinctKeyIndexes[i]));
+      }
+
+      Set<Tuple> distinctEntry = distinctAggrDatas.get(groupingKey);
+      if (distinctEntry == null) {
+        distinctEntry = new HashSet<Tuple>();
+        distinctAggrDatas.put(groupingKey, distinctEntry);
+      }
+      distinctEntry.add(distinctKeyTuple);
+    }
+
+    public void rescan() {
+      iterator = distinctAggrDatas.entrySet().iterator();
+      currentGroupingTuples = null;
+      groupingKeyChanged = false;
+      aggregatorFinished = false;
+    }
+
+    public void close() throws IOException {
+      distinctAggrDatas.clear();
+      distinctAggrDatas = null;
+      currentGroupingTuples = null;
+      iterator = null;
+    }
+
+    Entry<Tuple, Set<Tuple>> currentGroupingTuples;
+    Iterator<Tuple> distinctKeyIterator;
+    boolean groupingKeyChanged = false;
+
+    public Tuple next() {
+      if (aggregatorFinished) {
+        return null;
+      }
+      if (currentGroupingTuples == null) {
+        // first
+        if (!iterator.hasNext()) {
+          // Empty case
+          aggregatorFinished = true;
+          return null;
+        }
+        currentGroupingTuples = iterator.next();
+        groupingKeyChanged = true;
+        distinctKeyIterator = currentGroupingTuples.getValue().iterator();
+      }
+      if (!distinctKeyIterator.hasNext()) {
+        if (!iterator.hasNext()) {
+          aggregatorFinished = true;
+          return null;
+        }
+        currentGroupingTuples = iterator.next();
+        groupingKeyChanged = true;
+        distinctKeyIterator = currentGroupingTuples.getValue().iterator();
+      }
+      // node sequence, groupingKeys, 1'st distinctKeys, 2'st distinctKeys, ...
+      // If n'st == this.nodeSequence set with real data, otherwise set with 
NullDatum
+      Tuple tuple = new VTuple(resultTupleLength);
+      int tupleIndex = 0;
+      tuple.put(tupleIndex++, nodeSequenceDatum);
+
+      // merge grouping key
+      Tuple groupingKeyTuple = currentGroupingTuples.getKey();
+      int groupingKeyLength = groupingKeyTuple.size();
+      for (int i = 0; i < groupingKeyLength; i++, tupleIndex++) {
+        tuple.put(tupleIndex, groupingKeyTuple.get(i));
+      }
+
+      // merge distinctKey
+      for (int i = 0; i < distinctAggregators.length; i++) {
+        if (i == nodeSequence) {
+          Tuple distinctKeyTuple = distinctKeyIterator.next();
+          int distinctKeyLength = distinctKeyTuple.size();
+          for (int j = 0; j < distinctKeyLength; j++, tupleIndex++) {
+            tuple.put(tupleIndex, distinctKeyTuple.get(j));
+          }
+        } else {
+          Tuple dummyTuple = distinctAggregators[i].getDummyTuple();
+          int dummyTupleSize = dummyTuple.size();
+          for (int j = 0; j < dummyTupleSize; j++, tupleIndex++) {
+            tuple.put(tupleIndex, dummyTuple.get(j));
+          }
+        }
+      }
+
+      // merge non distinct aggregation tuple
+      if (nonDistinctHashAggregator != null) {
+        Tuple nonDistinctTuple;
+        if (nodeSequence == 0 && groupingKeyChanged) {
+          groupingKeyChanged = false;
+          nonDistinctTuple = 
nonDistinctHashAggregator.aggregate(groupingKeyTuple);
+          if (nonDistinctTuple == null) {
+            nonDistinctTuple = nonDistinctHashAggregator.getDummyTuple();
+          }
+        } else {
+          nonDistinctTuple = nonDistinctHashAggregator.getDummyTuple();
+        }
+        int tupleSize = nonDistinctTuple.size();
+        for (int j = 0; j < tupleSize; j++, tupleIndex++) {
+          tuple.put(tupleIndex, nonDistinctTuple.get(j));
+        }
+      }
+      return tuple;
+    }
+
+    public Tuple getDummyTuple() {
+      return dummyTuple;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
new file mode 100644
index 0000000..bc8885f
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
@@ -0,0 +1,295 @@
+  /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class adjusts shuffle columns between 
DistinctGroupbyFirstAggregationExec and
+ * DistinctGroupbyThirdAggregationExec. It shuffled by grouping columns and 
aggregation columns. Because of the
+ * shuffle, more DistinctGroupbyThirdAggregationExec will execute compare than 
previous two distinct group by
+ * algorithm. And then, many DistinctGroupbyThirdAggregationExec improve the 
performance of count distinct query.
+ *
+ * For example, there is a query as follows:
+ *  select sum(distinct l_orderkey), l_linenumber, l_returnflag, l_linestatus, 
l_shipdate,
+ *         count(distinct l_partkey), sum(l_orderkey)
+ *  from lineitem
+ *  group by l_linenumber, l_returnflag, l_linestatus, l_shipdate;
+ *
+ *  In this case, execution plan for this operator will set shuffle type as 
follows:
+ *    Incoming: 1 => 2 (type=HASH_SHUFFLE, key=?distinctseq (INT2), 
default.lineitem.l_linenumber (INT4),
+ *      default.lineitem.l_returnflag (TEXT), default.lineitem.l_linestatus 
(TEXT), default.lineitem.l_shipdate (TEXT),
+ *     default.lineitem.l_partkey (INT4), default.lineitem.l_orderkey (INT4), 
num=32)
+ *
+ *    Outgoing: 2 => 3 (type=HASH_SHUFFLE, key=default.lineitem.l_linenumber 
(INT4),
+ *      default.lineitem.l_returnflag (TEXT), default.lineitem.l_linestatus 
(TEXT),
+ *      default.lineitem.l_shipdate (TEXT), num=32)
+ *
+ *  For reference, input data and output data results as follows:
+ *
+ *  
-------------------------------------------------------------------------------------------------------------------
+ *  NodeSequence, l_linenumber, l_returnflag, l_linestatus, l_shipdate, 
l_partkey for distinct,
+ *  l_orderkey for distinct, l_orderkey for nondistinct
+ *  
-------------------------------------------------------------------------------------------------------------------
+ *  0, 2, R, F, 1993-11-09, 3, NULL, 3
+ *  0, 2, N, O, 1996-04-12, 1, NULL, 1
+ *  0, 1, N, O, 1997-01-28, 2, NULL, 2
+ *  0, 1, R, F, 1994-02-02, 2, NULL, 3
+ *  0, 1, N, O, 1996-03-13, 1, NULL, 1
+ *  1, 2, R, F, 1993-11-09, NULL, 3, NULL
+ *  1, 2, N, O, 1996-04-12, NULL, 1, NULL
+ *  1, 1, N, O, 1997-01-28, NULL, 2, NULL
+ *  1, 1, R, F, 1994-02-02, NULL, 3, NULL
+ *  1, 1, N, O, 1996-03-13, NULL, 1, NULL
+ *
+ */
+public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
+  private static Log LOG = 
LogFactory.getLog(DistinctGroupbySecondAggregationExec.class);
+  private DistinctGroupbyNode plan;
+  private PhysicalExec child;
+
+  private boolean finished = false;
+
+  private int numGroupingColumns;
+  private int[][] distinctKeyIndexes;
+  private FunctionContext[] nonDistinctAggrContexts;
+  private AggregationFunctionCallEval[] nonDistinctAggrFunctions;
+  private int nonDistinctAggrTupleStartIndex = -1;
+
+  public DistinctGroupbySecondAggregationExec(TaskAttemptContext context, 
DistinctGroupbyNode plan, SortExec sortExec)
+      throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema(), sortExec);
+    this.plan = plan;
+    this.child = sortExec;
+  }
+
+  @Override
+  public void init() throws IOException {
+    this.child.init();
+
+    numGroupingColumns = plan.getGroupingColumns().length;
+
+    List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+
+    // Finding distinct group by column index.
+    Set<Integer> groupingKeyIndexSet = new HashSet<Integer>();
+    for (Column col: plan.getGroupingColumns()) {
+      int keyIndex;
+      if (col.hasQualifier()) {
+        keyIndex = inSchema.getColumnId(col.getQualifiedName());
+      } else {
+        keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
+      }
+      groupingKeyIndexSet.add(keyIndex);
+    }
+
+    int numDistinct = 0;
+    for (GroupbyNode eachGroupby : groupbyNodes) {
+      if (eachGroupby.isDistinct()) {
+        numDistinct++;
+      } else {
+        nonDistinctAggrFunctions = eachGroupby.getAggFunctions();
+        if (nonDistinctAggrFunctions != null) {
+          for (AggregationFunctionCallEval eachFunction: 
nonDistinctAggrFunctions) {
+            eachFunction.setIntermediatePhase();
+          }
+          nonDistinctAggrContexts = new 
FunctionContext[nonDistinctAggrFunctions.length];
+        }
+      }
+    }
+
+    int index = 0;
+    distinctKeyIndexes = new int[numDistinct][];
+    for (GroupbyNode eachGroupby : groupbyNodes) {
+      if (eachGroupby.isDistinct()) {
+        List<Integer> distinctGroupingKeyIndex = new ArrayList<Integer>();
+        Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns();
+        for (int idx = 0; idx < distinctGroupingColumns.length; idx++) {
+          Column col = distinctGroupingColumns[idx];
+          int keyIndex;
+          if (col.hasQualifier()) {
+            keyIndex = inSchema.getColumnId(col.getQualifiedName());
+          } else {
+            keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
+          }
+          if (!groupingKeyIndexSet.contains(keyIndex)) {
+            distinctGroupingKeyIndex.add(keyIndex);
+          }
+        }
+        int i = 0;
+        distinctKeyIndexes[index] = new int[distinctGroupingKeyIndex.size()];
+        for (int eachIdx : distinctGroupingKeyIndex) {
+          distinctKeyIndexes[index][i++] = eachIdx;
+        }
+        index++;
+      }
+    }
+    if (nonDistinctAggrFunctions != null) {
+      nonDistinctAggrTupleStartIndex = inSchema.size() - 
nonDistinctAggrFunctions.length;
+    }
+  }
+
+  Tuple prevKeyTuple = null;
+  Tuple prevTuple = null;
+  int prevSeq = -1;
+
+  @Override
+  public Tuple next() throws IOException {
+    if (finished) {
+      return null;
+    }
+
+    Tuple result = null;
+    while (!context.isStopped()) {
+      Tuple childTuple = child.next();
+      if (childTuple == null) {
+        finished = true;
+
+        if (prevTuple == null) {
+          // Empty case
+          return null;
+        }
+        if (prevSeq == 0 && nonDistinctAggrFunctions != null) {
+          terminatedNonDistinctAggr(prevTuple);
+        }
+        result = prevTuple;
+        break;
+      }
+
+      Tuple tuple = null;
+      try {
+        tuple = childTuple.clone();
+      } catch (CloneNotSupportedException e) {
+        throw new IOException(e.getMessage(), e);
+      }
+
+      int distinctSeq = tuple.get(0).asInt2();
+      Tuple keyTuple = getKeyTuple(distinctSeq, tuple);
+
+      if (prevKeyTuple == null) {
+        // First
+        if (distinctSeq == 0 && nonDistinctAggrFunctions != null) {
+          initNonDistinctAggrContext();
+          mergeNonDistinctAggr(tuple);
+        }
+        prevKeyTuple = keyTuple;
+        prevTuple = tuple;
+        prevSeq = distinctSeq;
+        continue;
+      }
+
+      if (!prevKeyTuple.equals(keyTuple)) {
+        // new grouping key
+        if (prevSeq == 0 && nonDistinctAggrFunctions != null) {
+          terminatedNonDistinctAggr(prevTuple);
+        }
+        result = prevTuple;
+
+        prevKeyTuple = keyTuple;
+        prevTuple = tuple;
+        prevSeq = distinctSeq;
+
+        if (distinctSeq == 0 && nonDistinctAggrFunctions != null) {
+          initNonDistinctAggrContext();
+          mergeNonDistinctAggr(tuple);
+        }
+        break;
+      } else {
+        prevKeyTuple = keyTuple;
+        prevTuple = tuple;
+        prevSeq = distinctSeq;
+        if (distinctSeq == 0 && nonDistinctAggrFunctions != null) {
+          mergeNonDistinctAggr(tuple);
+        }
+      }
+    }
+
+    return result;
+  }
+
+  private void initNonDistinctAggrContext() {
+    if (nonDistinctAggrFunctions != null) {
+      nonDistinctAggrContexts = new 
FunctionContext[nonDistinctAggrFunctions.length];
+      for (int i = 0; i < nonDistinctAggrFunctions.length; i++) {
+        nonDistinctAggrContexts[i] = nonDistinctAggrFunctions[i].newContext();
+      }
+    }
+  }
+
+  private void mergeNonDistinctAggr(Tuple tuple) {
+    if (nonDistinctAggrFunctions == null) {
+      return;
+    }
+    for (int i = 0; i < nonDistinctAggrFunctions.length; i++) {
+      nonDistinctAggrFunctions[i].merge(nonDistinctAggrContexts[i], inSchema, 
tuple);
+    }
+  }
+
+  private void terminatedNonDistinctAggr(Tuple tuple) {
+    if (nonDistinctAggrFunctions == null) {
+      return;
+    }
+    for (int i = 0; i < nonDistinctAggrFunctions.length; i++) {
+      tuple.put(nonDistinctAggrTupleStartIndex + i, 
nonDistinctAggrFunctions[i].terminate(nonDistinctAggrContexts[i]));
+    }
+  }
+
+  private Tuple getKeyTuple(int distinctSeq, Tuple tuple) {
+    int[] columnIndexes = distinctKeyIndexes[distinctSeq];
+
+    Tuple keyTuple = new VTuple(numGroupingColumns + columnIndexes.length + 1);
+    keyTuple.put(0, tuple.get(0));
+    for (int i = 0; i < numGroupingColumns; i++) {
+      keyTuple.put(i + 1, tuple.get(i + 1));
+    }
+    for (int i = 0; i < columnIndexes.length; i++) {
+      keyTuple.put(i + 1 + numGroupingColumns, tuple.get(columnIndexes[i]));
+    }
+
+    return keyTuple;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    prevKeyTuple = null;
+    prevTuple = null;
+    finished = false;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
new file mode 100644
index 0000000..239dabf
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ *  This class aggregates the output of DistinctGroupbySecondAggregationExec.
+ *
+ */
+public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
+  private static Log LOG = 
LogFactory.getLog(DistinctGroupbyThirdAggregationExec.class);
+  private DistinctGroupbyNode plan;
+  private PhysicalExec child;
+
+  private boolean finished = false;
+
+  private DistinctFinalAggregator[] aggregators;
+  private DistinctFinalAggregator nonDistinctAggr;
+
+  private int resultTupleLength;
+  private int numGroupingColumns;
+
+  private int[] resultTupleIndexes;
+
+  public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context, 
DistinctGroupbyNode plan, SortExec sortExec)
+      throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema(), sortExec);
+    this.plan = plan;
+    this.child = sortExec;
+  }
+
+  @Override
+  public void init() throws IOException {
+    this.child.init();
+
+    numGroupingColumns = plan.getGroupingColumns().length;
+    resultTupleLength = numGroupingColumns;
+
+    List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+
+    List<DistinctFinalAggregator> aggregatorList = new 
ArrayList<DistinctFinalAggregator>();
+    int inTupleIndex = 1 + numGroupingColumns;
+    int outTupleIndex = numGroupingColumns;
+    int distinctSeq = 0;
+
+    for (GroupbyNode eachGroupby : groupbyNodes) {
+      if (eachGroupby.isDistinct()) {
+        aggregatorList.add(new DistinctFinalAggregator(distinctSeq, 
inTupleIndex, outTupleIndex, eachGroupby));
+        distinctSeq++;
+
+        Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns();
+        inTupleIndex += distinctGroupingColumns.length;
+        outTupleIndex += eachGroupby.getAggFunctions().length;
+      } else {
+        nonDistinctAggr = new DistinctFinalAggregator(-1, inTupleIndex, 
outTupleIndex, eachGroupby);
+        outTupleIndex += eachGroupby.getAggFunctions().length;
+      }
+      resultTupleLength += eachGroupby.getAggFunctions().length;
+    }
+    aggregators = aggregatorList.toArray(new DistinctFinalAggregator[]{});
+
+    // make output schema mapping index
+    resultTupleIndexes = new int[outSchema.size()];
+    Map<Column, Integer> groupbyResultTupleIndex = new HashMap<Column, 
Integer>();
+    int resultTupleIndex = 0;
+    for (Column eachColumn: plan.getGroupingColumns()) {
+      groupbyResultTupleIndex.put(eachColumn, resultTupleIndex);
+      resultTupleIndex++;
+    }
+    for (GroupbyNode eachGroupby : groupbyNodes) {
+      Set<Column> groupingColumnSet = new HashSet<Column>();
+      for (Column column: eachGroupby.getGroupingColumns()) {
+        groupingColumnSet.add(column);
+      }
+      for (Target eachTarget: eachGroupby.getTargets()) {
+        if (!groupingColumnSet.contains(eachTarget.getNamedColumn())) {
+          //aggr function
+          groupbyResultTupleIndex.put(eachTarget.getNamedColumn(), 
resultTupleIndex);
+          resultTupleIndex++;
+        }
+      }
+    }
+
+    int index = 0;
+    for (Column eachOutputColumn: outSchema.getColumns()) {
+      // If column is avg aggregation function, outschema's column type is 
float
+      // but groupbyResultTupleIndex's column type is protobuf
+
+      int matchedIndex = -1;
+      for (Column eachIndexColumn: groupbyResultTupleIndex.keySet()) {
+        if 
(eachIndexColumn.getQualifiedName().equals(eachOutputColumn.getQualifiedName()))
 {
+          matchedIndex = groupbyResultTupleIndex.get(eachIndexColumn);
+          break;
+        }
+      }
+      if (matchedIndex < 0) {
+        throw new IOException("Can't find proper output column mapping: " + 
eachOutputColumn);
+      }
+      resultTupleIndexes[matchedIndex] = index++;
+    }
+  }
+
+  Tuple prevKeyTuple = null;
+  Tuple prevTuple = null;
+
+  @Override
+  public Tuple next() throws IOException {
+    if (finished) {
+      return null;
+    }
+
+    Tuple resultTuple = new VTuple(resultTupleLength);
+
+    while (!context.isStopped()) {
+      Tuple childTuple = child.next();
+      // Last tuple
+      if (childTuple == null) {
+        finished = true;
+
+        if (prevTuple == null) {
+          // Empty case
+          if (numGroupingColumns == 0) {
+            // No grouping column, return null tuple
+            return makeEmptyTuple();
+          } else {
+            return null;
+          }
+        }
+
+        for (int i = 0; i < numGroupingColumns; i++) {
+          resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1));
+        }
+        for (DistinctFinalAggregator eachAggr: aggregators) {
+          eachAggr.terminate(resultTuple);
+        }
+        break;
+      }
+
+      Tuple tuple = null;
+      try {
+        tuple = childTuple.clone();
+      } catch (CloneNotSupportedException e) {
+        throw new IOException(e.getMessage(), e);
+      }
+
+      int distinctSeq = tuple.get(0).asInt2();
+      Tuple keyTuple = getGroupingKeyTuple(tuple);
+
+      // First tuple
+      if (prevKeyTuple == null) {
+        prevKeyTuple = keyTuple;
+        prevTuple = tuple;
+
+        aggregators[distinctSeq].merge(tuple);
+        continue;
+      }
+
+      if (!prevKeyTuple.equals(keyTuple)) {
+        // new grouping key
+        for (int i = 0; i < numGroupingColumns; i++) {
+          resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1));
+        }
+        for (DistinctFinalAggregator eachAggr: aggregators) {
+          eachAggr.terminate(resultTuple);
+        }
+
+        prevKeyTuple = keyTuple;
+        prevTuple = tuple;
+
+        aggregators[distinctSeq].merge(tuple);
+        break;
+      } else {
+        prevKeyTuple = keyTuple;
+        prevTuple = tuple;
+        aggregators[distinctSeq].merge(tuple);
+      }
+    }
+
+    return resultTuple;
+  }
+
+  private Tuple makeEmptyTuple() {
+    Tuple resultTuple = new VTuple(resultTupleLength);
+    for (DistinctFinalAggregator eachAggr: aggregators) {
+      eachAggr.terminateEmpty(resultTuple);
+    }
+
+    return resultTuple;
+  }
+
+  private Tuple getGroupingKeyTuple(Tuple tuple) {
+    Tuple keyTuple = new VTuple(numGroupingColumns);
+    for (int i = 0; i < numGroupingColumns; i++) {
+      keyTuple.put(i, tuple.get(i + 1));
+    }
+
+    return keyTuple;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    prevKeyTuple = null;
+    prevTuple = null;
+    finished = false;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+  }
+
+  class DistinctFinalAggregator {
+    private FunctionContext[] functionContexts;
+    private AggregationFunctionCallEval[] aggrFunctions;
+    private int seq;
+    private int inTupleIndex;
+    private int outTupleIndex;
+    public DistinctFinalAggregator(int seq, int inTupleIndex, int 
outTupleIndex, GroupbyNode groupbyNode) {
+      this.seq = seq;
+      this.inTupleIndex = inTupleIndex;
+      this.outTupleIndex = outTupleIndex;
+
+      aggrFunctions = groupbyNode.getAggFunctions();
+      if (aggrFunctions != null) {
+        for (AggregationFunctionCallEval eachFunction: aggrFunctions) {
+          eachFunction.setFinalPhase();
+        }
+      }
+      newFunctionContext();
+    }
+
+    private void newFunctionContext() {
+      functionContexts = new FunctionContext[aggrFunctions.length];
+      for (int i = 0; i < aggrFunctions.length; i++) {
+        functionContexts[i] = aggrFunctions[i].newContext();
+      }
+    }
+
+    public void merge(Tuple tuple) {
+      for (int i = 0; i < aggrFunctions.length; i++) {
+        aggrFunctions[i].merge(functionContexts[i], inSchema, tuple);
+      }
+
+      if (seq == 0 && nonDistinctAggr != null) {
+        if (!tuple.get(nonDistinctAggr.inTupleIndex).isNull()) {
+          nonDistinctAggr.merge(tuple);
+        }
+      }
+    }
+
+    public void terminate(Tuple resultTuple) {
+      for (int i = 0; i < aggrFunctions.length; i++) {
+        resultTuple.put(resultTupleIndexes[outTupleIndex + i], 
aggrFunctions[i].terminate(functionContexts[i]));
+      }
+      newFunctionContext();
+
+      if (seq == 0 && nonDistinctAggr != null) {
+        nonDistinctAggr.terminate(resultTuple);
+      }
+    }
+
+    public void terminateEmpty(Tuple resultTuple) {
+      newFunctionContext();
+      for (int i = 0; i < aggrFunctions.length; i++) {
+        resultTuple.put(resultTupleIndexes[outTupleIndex + i], 
aggrFunctions[i].terminate(functionContexts[i]));
+      }
+      if (seq == 0 && nonDistinctAggr != null) {
+        nonDistinctAggr.terminateEmpty(resultTuple);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 4deddee..598054c 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -33,10 +33,8 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.PlanningException;
-import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
-import org.apache.tajo.engine.planner.UniformRangePartition;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
@@ -45,6 +43,8 @@ import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
 import org.apache.tajo.master.TaskSchedulerContext;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.storage.AbstractStorageManager;
@@ -799,13 +799,30 @@ public class Repartitioner {
     }
 
     int groupingColumns = 0;
-    GroupbyNode groupby = 
PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), 
NodeType.GROUP_BY);
-    if (groupby != null) {
-      groupingColumns = groupby.getGroupingColumns().length;
-    } else {
-      DistinctGroupbyNode dGroupby = 
PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), 
NodeType.DISTINCT_GROUP_BY);
-      if (dGroupby != null) {
-        groupingColumns = dGroupby.getGroupingColumns().length;
+    LogicalNode[] groupbyNodes = 
PlannerUtil.findAllNodes(subQuery.getBlock().getPlan(),
+        new NodeType[]{NodeType.GROUP_BY, NodeType.DISTINCT_GROUP_BY});
+    if (groupbyNodes != null && groupbyNodes.length > 0) {
+      LogicalNode bottomNode = groupbyNodes[0];
+      if (bottomNode.getType() == NodeType.GROUP_BY) {
+        groupingColumns = 
((GroupbyNode)bottomNode).getGroupingColumns().length;
+      } else if (bottomNode.getType() == NodeType.DISTINCT_GROUP_BY) {
+        DistinctGroupbyNode distinctNode = 
PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), 
NodeType.DISTINCT_GROUP_BY);
+        if (distinctNode == null) {
+          LOG.warn(subQuery.getId() + ", Can't find current 
DistinctGroupbyNode");
+          distinctNode = (DistinctGroupbyNode)bottomNode;
+        }
+        groupingColumns = distinctNode.getGroupingColumns().length;
+
+        Enforcer enforcer = execBlock.getEnforcer();
+        EnforceProperty property = 
PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
+        if (property != null) {
+          if (property.getDistinct().getIsMultipleAggregation()) {
+            MultipleAggregationStage stage = 
property.getDistinct().getMultipleAggregationStage();
+            if (stage != MultipleAggregationStage.THRID_STAGE) {
+              groupingColumns = distinctNode.getOutSchema().size();
+            }
+          }
+        }
       }
     }
     // get a proper number of tasks
@@ -1145,7 +1162,8 @@ public class Repartitioner {
 
     // set the partition number for group by and sort
     if (channel.getShuffleType() == HASH_SHUFFLE) {
-      if (execBlock.getPlan().getType() == NodeType.GROUP_BY) {
+      if (execBlock.getPlan().getType() == NodeType.GROUP_BY ||
+          execBlock.getPlan().getType() == NodeType.DISTINCT_GROUP_BY) {
         keys = channel.getShuffleKeys();
       }
     } else if (channel.getShuffleType() == RANGE_SHUFFLE) {

Reply via email to