http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 919ac9b..1f348ff 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -43,13 +43,18 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.json.CoreGsonHelper; +import org.apache.tajo.engine.plan.proto.PlanProto; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; import org.apache.tajo.engine.planner.PlannerUtil; +import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; +import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; import org.apache.tajo.master.*; import org.apache.tajo.master.TaskRunnerGroupEvent.EventType; @@ -810,9 +815,30 @@ public class SubQuery implements EventHandler<SubQueryEvent> { if (grpNode.getType() == NodeType.GROUP_BY) { hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0; } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) { - hasGroupColumns = ((DistinctGroupbyNode)grpNode).getGroupingColumns().length > 0; + // Find current distinct stage node. + DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); + if (distinctNode == null) { + LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode"); + distinctNode = (DistinctGroupbyNode)grpNode; + } + hasGroupColumns = distinctNode.getGroupingColumns().length > 0; + + Enforcer enforcer = subQuery.getBlock().getEnforcer(); + if (enforcer == null) { + LOG.warn(subQuery.getId() + ", DistinctGroupbyNode's enforcer is null."); + } + EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode); + if (property != null) { + if (property.getDistinct().getIsMultipleAggregation()) { + MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage(); + if (stage != MultipleAggregationStage.THRID_STAGE) { + hasGroupColumns = true; + } + } + } } if (!hasGroupColumns) { + LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); return 1; } else { long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index bde2459..2760301 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -315,6 +315,12 @@ message DistinctGroupbyEnforcer { SORT_AGGREGATION = 1; } + enum MultipleAggregationStage { + FIRST_STAGE = 0; + SECOND_STAGE = 1; + THRID_STAGE = 3; + } + message SortSpecArray { required int32 pid = 1; repeated SortSpecProto sortSpecs = 2; @@ -322,6 +328,8 @@ message DistinctGroupbyEnforcer { required int32 pid = 1; required DistinctAggregationAlgorithm algorithm = 2; repeated SortSpecArray sortSpecArrays = 3; + required bool isMultipleAggregation = 4 [default = false]; + optional MultipleAggregationStage multipleAggregationStage = 5; } message EnforcerProto { http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java index fccec26..8b9f9f7 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java @@ -20,10 +20,7 @@ package org.apache.tajo.engine.query; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.IntegrationTest; -import org.apache.tajo.QueryTestCaseBase; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.*; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf.ConfVars; @@ -34,9 +31,14 @@ import org.apache.tajo.master.querymaster.QueryUnit; import org.apache.tajo.master.querymaster.SubQuery; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TajoWorker; +import org.junit.AfterClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import java.sql.ResultSet; import java.util.*; @@ -44,11 +46,33 @@ import java.util.*; import static org.junit.Assert.*; @Category(IntegrationTest.class) +@RunWith(Parameterized.class) public class TestGroupByQuery extends QueryTestCaseBase { private static final Log LOG = LogFactory.getLog(TestGroupByQuery.class); - public TestGroupByQuery() throws Exception { + public TestGroupByQuery(String groupByOption) throws Exception { super(TajoConstants.DEFAULT_DATABASE_NAME); + + Map<String, String> variables = new HashMap<String, String>(); + if (groupByOption.equals("MultiLevel")) { + variables.put(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname(), "true"); + } else { + variables.put(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname(), "false"); + } + client.updateSessionVariables(variables); + } + + @AfterClass + public static void tearDown() throws Exception { + client.unsetSessionVariables(TUtil.newList(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname())); + } + + @Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][]{ + {"MultiLevel"}, + {"No-MultiLevel"}, + }); } @Test @@ -285,6 +309,24 @@ public class TestGroupByQuery extends QueryTestCaseBase { } @Test + public final void testDistinctAggregation8() throws Exception { + /* + select + sum(distinct l_orderkey), + l_linenumber, l_returnflag, l_linestatus, l_shipdate, + count(distinct l_partkey), + sum(l_orderkey) + from + lineitem + group by + l_linenumber, l_returnflag, l_linestatus, l_shipdate; + */ + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + + @Test public final void testDistinctAggregationWithHaving1() throws Exception { // select l_linenumber, count(*), count(distinct l_orderkey), sum(distinct l_orderkey) from lineitem // group by l_linenumber having sum(distinct l_orderkey) >= 6; @@ -343,6 +385,14 @@ public class TestGroupByQuery extends QueryTestCaseBase { assertResultSet(res, "testDistinctAggregation_case8.result"); res.close(); + res = executeFile("testDistinctAggregation_case9.sql"); + assertResultSet(res, "testDistinctAggregation_case9.result"); + res.close(); + + res = executeFile("testDistinctAggregation_case10.sql"); + assertResultSet(res, "testDistinctAggregation_case10.result"); + res.close(); + // case9 KeyValueSet tableOptions = new KeyValueSet(); tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql new file mode 100644 index 0000000..0553d06 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql @@ -0,0 +1,9 @@ +select + sum(distinct l_orderkey), + l_linenumber, l_returnflag, l_linestatus, l_shipdate, + count(distinct l_partkey), + sum(l_orderkey) +from + lineitem +group by + l_linenumber, l_returnflag, l_linestatus, l_shipdate; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql new file mode 100644 index 0000000..6ab7c25 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql @@ -0,0 +1,5 @@ +select sum(cnt1), sum(sum2) +from ( + select o_orderdate, count(distinct o_orderpriority), count(distinct o_orderkey) cnt1, sum(o_totalprice) sum2 + from orders group by o_orderdate +) a \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql new file mode 100644 index 0000000..6265599 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql @@ -0,0 +1,11 @@ +select + lineitem.l_orderkey as l_orderkey, + count(distinct lineitem.l_partkey) as cnt1, + sum(lineitem.l_quantity + lineitem.l_linenumber)/count(distinct lineitem.l_suppkey) as value2, + lineitem.l_partkey as l_partkey, + avg(lineitem.l_quantity) as avg1, + count(distinct lineitem.l_suppkey) as cnt2 +from + lineitem +group by + lineitem.l_orderkey, lineitem.l_partkey \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result new file mode 100644 index 0000000..519390d --- /dev/null +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result @@ -0,0 +1,7 @@ +?sum,l_linenumber,l_returnflag,l_linestatus,l_shipdate,?count_1,?sum_2 +------------------------------- +1,1,N,O,1996-03-13,1,1 +2,1,N,O,1997-01-28,1,2 +3,1,R,F,1994-02-02,1,3 +1,2,N,O,1996-04-12,1,1 +3,2,R,F,1993-11-09,1,3 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result new file mode 100644 index 0000000..2035d9f --- /dev/null +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result @@ -0,0 +1,3 @@ +?sum,?sum_1 +------------------------------- +3,414440.9 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result new file mode 100644 index 0000000..506eea0 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result @@ -0,0 +1,6 @@ +l_orderkey,cnt1,value2,l_partkey,avg1,cnt2 +------------------------------- +1,1,28.0,1,26.5,2 +2,1,39.0,2,38.0,1 +3,1,46.0,2,45.0,1 +3,1,51.0,3,49.0,1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result index e6b12b1..25f1ae7 100644 --- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result +++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result @@ -25,6 +25,7 @@ Available Session Variables: \set JOIN_PER_SHUFFLE_SIZE [int value] - shuffle output size for join (mb) \set GROUPBY_PER_SHUFFLE_SIZE [int value] - shuffle output size for sort (mb) \set TABLE_PARTITION_PER_SHUFFLE_SIZE [int value] - shuffle output size for partition table write (mb) +\set GROUPBY_MULTI_LEVEL_ENABLED [true or false] - Multiple level groupby enabled \set EXTSORT_BUFFER_SIZE [long value] - sort buffer size for external sort (mb) \set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb) \set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb) http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java index 51388a4..084c105 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java @@ -42,10 +42,6 @@ public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComp @SuppressWarnings("unused") private final boolean[] nullFirsts; - private Datum left; - private Datum right; - private int compVal; - /** * @param schema The schema of input tuples * @param sortKeys The description of sort keys @@ -88,6 +84,10 @@ public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComp @Override public int compare(Tuple tuple1, Tuple tuple2) { + Datum left = null; + Datum right = null; + int compVal = 0; + for (int i = 0; i < sortKeyIds.length; i++) { left = tuple1.get(sortKeyIds[i]); right = tuple2.get(sortKeyIds[i]);
