Ali Alsuliman has submitted this change and it was merged. ( https://asterix-gerrit.ics.uci.edu/3353 )
Change subject: [ASTERIXDB-2552][RT] Implement micro external sort for subplans ...................................................................... [ASTERIXDB-2552][RT] Implement micro external sort for subplans - user model changes: no - storage format changes: no - interface changes: no Details: This patch is to change the in memory sort used in subplans to be a micro external sort to avoid out of memory error. - added reset() to the runs merger to allow reusing the runs merger. - renamed "InMemoryStableSortPOperator" to "MicroStableSortPOperator". - changed the tag from "IN_MEMORY_STABLE_SORT" to "MICRO_STABLE_SORT". - added test cases. Change-Id: I930849d644c60d461d2869c9773b85e49b46fbdb Reviewed-on: https://asterix-gerrit.ics.uci.edu/3353 Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Dmitry Lychagin <dmitry.lycha...@couchbase.com> --- A asterixdb/asterix-app/src/test/resources/optimizerts/queries/micro_external_sort/micro_external_sort.sqlpp M asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-2402.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.10.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.11.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.12.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.13.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.14.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.16.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.17.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.9.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/sugar-06-distinct.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan A asterixdb/asterix-app/src/test/resources/optimizerts/results/micro_external_sort/micro_external_sort.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/nest_aggregate.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/nested_loj3.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-dual-order.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-right-ahead.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-1.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/sorting/micro_external_sort/micro_external_sort.1.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/sorting/micro_external_sort/micro_external_sort.1.adm M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java R hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java 33 files changed, 328 insertions(+), 101 deletions(-) Approvals: Jenkins: Verified; ; Verified Anon. E. Moose (1000171): Dmitry Lychagin: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/micro_external_sort/micro_external_sort.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/micro_external_sort/micro_external_sort.sqlpp new file mode 100644 index 0000000..a075cd1 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/micro_external_sort/micro_external_sort.sqlpp @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// testing micro external sort in a subplan +drop dataverse test if exists; +create dataverse test; +use test; + +set `compiler.sortmemory` "130KB"; + +with ds AS ( +from range(0,89999) i +select value {"id": i, "a": +CASE ( i % 3) +WHEN 0 THEN "one" +WHEN 1 THEN "two" +WHEN 2 THEN "three" +END, +"b": CASE ( (i div 3) % 3) +WHEN 0 THEN "SUM_1s" +WHEN 1 THEN "SUM_2s" +WHEN 2 THEN "SUM_3s" END, +"c": CASE ((i div 3) % 3) +WHEN 0 THEN 1 +WHEN 1 THEN 2 +WHEN 2 THEN 3 END}) +SELECT `group`, +(SELECT group_num, sum(g.d.c) AS sum +FROM g +GROUP BY g.d.b AS group_num +ORDER BY sum +) AS sum_per_group +FROM ds d +GROUP BY d.a AS `group` +GROUP AS g +ORDER BY `group`; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-2402.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-2402.plan index 2918193..f3dbc01 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-2402.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-2402.plan @@ -10,7 +10,7 @@ -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } - -- IN_MEMORY_STABLE_SORT [$$200(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$200(ASC)] |LOCAL| -- ASSIGN |LOCAL| -- UNNEST |LOCAL| -- SUBPLAN |LOCAL| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.10.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.10.plan index 6b66a53..f9d98dd 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.10.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.10.plan @@ -7,7 +7,7 @@ { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$62(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$62(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.11.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.11.plan index dd3bb8f..55e2f55 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.11.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.11.plan @@ -7,13 +7,13 @@ { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$62(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$62(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$67(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$67(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.12.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.12.plan index 1766fc3..469fa47 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.12.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.12.plan @@ -11,7 +11,7 @@ { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$66(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$66(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.13.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.13.plan index 98c4c4f..d89ba4a 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.13.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.13.plan @@ -7,7 +7,7 @@ { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$76(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$76(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } { @@ -17,7 +17,7 @@ { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$86(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$86(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.14.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.14.plan index b563fef..aa3554b 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.14.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.14.plan @@ -7,7 +7,7 @@ { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$100(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$100(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } { @@ -17,13 +17,13 @@ { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$110(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$110(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$105(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$105(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.16.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.16.plan index ab82bf0..79505b4 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.16.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.16.plan @@ -7,7 +7,7 @@ { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$x(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$x(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } { @@ -18,13 +18,13 @@ { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$z(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$z(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$y(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$y(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |LOCAL| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.17.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.17.plan index 6249d46..61ecb60 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.17.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.17.plan @@ -10,13 +10,13 @@ { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$106(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$106(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$111(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$111(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| @@ -58,13 +58,13 @@ { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$124(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$124(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$129(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$129(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.9.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.9.plan index 6d8202e..4199770 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.9.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.9.plan @@ -7,7 +7,7 @@ { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$58(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$58(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/sugar-06-distinct.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/sugar-06-distinct.plan index f4eea8a..aeeb046 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/sugar-06-distinct.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/sugar-06-distinct.plan @@ -12,7 +12,7 @@ { -- AGGREGATE |LOCAL| -- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$45(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$45(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan index ef733cb..b85e9b0 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan @@ -5,7 +5,7 @@ { -- ASSIGN |LOCAL| -- ASSIGN |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$16(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$16(ASC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } -- COMMIT |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/micro_external_sort/micro_external_sort.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/micro_external_sort/micro_external_sort.plan new file mode 100644 index 0000000..4f8ff43 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/micro_external_sort/micro_external_sort.plan @@ -0,0 +1,27 @@ +-- DISTRIBUTE_RESULT |LOCAL| + -- ONE_TO_ONE_EXCHANGE |LOCAL| + -- STREAM_PROJECT |LOCAL| + -- ASSIGN |LOCAL| + -- ONE_TO_ONE_EXCHANGE |LOCAL| + -- PRE_CLUSTERED_GROUP_BY[$$145] |LOCAL| + { + -- AGGREGATE |LOCAL| + -- MICRO_STABLE_SORT [$$149(ASC)] |LOCAL| + -- MICRO_PRE_CLUSTERED_GROUP_BY[$$146] |LOCAL| + { + -- AGGREGATE |LOCAL| + -- AGGREGATE |LOCAL| + -- NESTED_TUPLE_SOURCE |LOCAL| + } + -- MICRO_STABLE_SORT [$$146(ASC)] |LOCAL| + -- ASSIGN |LOCAL| + -- NESTED_TUPLE_SOURCE |LOCAL| + } + -- ONE_TO_ONE_EXCHANGE |LOCAL| + -- STABLE_SORT [$$145(ASC)] |LOCAL| + -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED| + -- STREAM_PROJECT |UNPARTITIONED| + -- ASSIGN |UNPARTITIONED| + -- ASSIGN |UNPARTITIONED| + -- UNNEST |UNPARTITIONED| + -- EMPTY_TUPLE_SOURCE |UNPARTITIONED| \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/nest_aggregate.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/nest_aggregate.plan index d4cdff4..0b8a56d 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/nest_aggregate.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/nest_aggregate.plan @@ -8,13 +8,13 @@ { -- AGGREGATE |LOCAL| -- STREAM_LIMIT |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$39(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$39(ASC)] |LOCAL| -- MICRO_PRE_CLUSTERED_GROUP_BY[$$51] |LOCAL| { -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } - -- IN_MEMORY_STABLE_SORT [$$51(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$51(ASC)] |LOCAL| -- STREAM_SELECT |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/nested_loj3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/nested_loj3.plan index 957a2ee..9232477 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/nested_loj3.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/nested_loj3.plan @@ -19,7 +19,7 @@ -- STREAM_SELECT |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } - -- IN_MEMORY_STABLE_SORT [$$31(ASC), $$32(ASC), $$33(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$31(ASC), $$32(ASC), $$33(ASC)] |LOCAL| -- STREAM_SELECT |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan index f2adbf6..241257b 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan @@ -15,7 +15,7 @@ -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } - -- IN_MEMORY_STABLE_SORT [$$19(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$19(ASC)] |LOCAL| -- ASSIGN |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan index 4405b9d..612a9b7 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan @@ -7,7 +7,7 @@ { -- AGGREGATE |LOCAL| -- STREAM_LIMIT |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$62(DESC)] |LOCAL| + -- MICRO_STABLE_SORT [$$62(DESC)] |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-dual-order.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-dual-order.plan index e8b029a..e7ac5d6 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-dual-order.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-dual-order.plan @@ -23,7 +23,7 @@ -- PRE_CLUSTERED_GROUP_BY[$$126] |PARTITIONED| { -- AGGREGATE |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$i(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$i(ASC)] |LOCAL| -- STREAM_SELECT |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } @@ -113,7 +113,7 @@ -- PRE_CLUSTERED_GROUP_BY[$$129] |PARTITIONED| { -- AGGREGATE |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$i(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$i(ASC)] |LOCAL| -- STREAM_SELECT |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-right-ahead.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-right-ahead.plan index ab700b6..cd036ff 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-right-ahead.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-right-ahead.plan @@ -23,7 +23,7 @@ -- PRE_CLUSTERED_GROUP_BY[$$78] |PARTITIONED| { -- AGGREGATE |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$i(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$i(ASC)] |LOCAL| -- STREAM_SELECT |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } @@ -80,7 +80,7 @@ -- PRE_CLUSTERED_GROUP_BY[$$80] |PARTITIONED| { -- AGGREGATE |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$i(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$i(ASC)] |LOCAL| -- STREAM_SELECT |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-1.plan index f86825e..6e92aec 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-1.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-1.plan @@ -10,7 +10,7 @@ -- SUBPLAN |UNPARTITIONED| { -- AGGREGATE |UNPARTITIONED| - -- IN_MEMORY_STABLE_SORT [$$j(ASC)] |UNPARTITIONED| + -- MICRO_STABLE_SORT [$$j(ASC)] |UNPARTITIONED| -- UNNEST |UNPARTITIONED| -- NESTED_TUPLE_SOURCE |UNPARTITIONED| } @@ -28,7 +28,7 @@ -- AGGREGATE |LOCAL| -- STREAM_SELECT |UNPARTITIONED| -- RUNNING_AGGREGATE |UNPARTITIONED| - -- IN_MEMORY_STABLE_SORT [$$j(ASC)] |UNPARTITIONED| + -- MICRO_STABLE_SORT [$$j(ASC)] |UNPARTITIONED| -- UNNEST |UNPARTITIONED| -- NESTED_TUPLE_SOURCE |UNPARTITIONED| } diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan index 2eb7603..0c0b45d 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan @@ -12,7 +12,7 @@ -- SUBPLAN |LOCAL| { -- AGGREGATE |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$j(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$j(ASC)] |LOCAL| -- UNNEST |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } @@ -22,7 +22,7 @@ -- SUBPLAN |PARTITIONED| { -- AGGREGATE |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$98(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$98(ASC)] |LOCAL| -- ASSIGN |LOCAL| -- UNNEST |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| @@ -41,7 +41,7 @@ -- SUBPLAN |LOCAL| { -- AGGREGATE |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$j(ASC)] |LOCAL| + -- MICRO_STABLE_SORT [$$j(ASC)] |LOCAL| -- UNNEST |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/sorting/micro_external_sort/micro_external_sort.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/sorting/micro_external_sort/micro_external_sort.1.query.sqlpp new file mode 100644 index 0000000..7d100cd --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/sorting/micro_external_sort/micro_external_sort.1.query.sqlpp @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// testing micro external sort in a subplan that generates runs and merges them +drop dataverse test if exists; +create dataverse test; +use test; + +set `compiler.sortmemory` "130KB"; + +with ds AS ( +from range(0,89999) i +select value {"id": i, "a": +CASE ( i % 3) +WHEN 0 THEN "one" +WHEN 1 THEN "two" +WHEN 2 THEN "three" +END, +"b": CASE ( (i div 3) % 3) +WHEN 0 THEN "SUM_1s" +WHEN 1 THEN "SUM_2s" +WHEN 2 THEN "SUM_3s" END, +"c": CASE ((i div 3) % 3) +WHEN 0 THEN 1 +WHEN 1 THEN 2 +WHEN 2 THEN 3 END}) +SELECT `group`, +(SELECT group_num, sum(g.d.c) AS sum +FROM g +GROUP BY g.d.b AS group_num +ORDER BY sum +) AS sum_per_group +FROM ds d +GROUP BY d.a AS `group` +GROUP AS g +ORDER BY `group`; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/sorting/micro_external_sort/micro_external_sort.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/sorting/micro_external_sort/micro_external_sort.1.adm new file mode 100644 index 0000000..6d57f23 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/sorting/micro_external_sort/micro_external_sort.1.adm @@ -0,0 +1,3 @@ +{ "group": "one", "sum_per_group": [ { "group_num": "SUM_1s", "sum": 10000 }, { "group_num": "SUM_2s", "sum": 20000 }, { "group_num": "SUM_3s", "sum": 30000 } ] } +{ "group": "three", "sum_per_group": [ { "group_num": "SUM_1s", "sum": 10000 }, { "group_num": "SUM_2s", "sum": 20000 }, { "group_num": "SUM_3s", "sum": 30000 } ] } +{ "group": "two", "sum_per_group": [ { "group_num": "SUM_1s", "sum": 10000 }, { "group_num": "SUM_2s", "sum": 20000 }, { "group_num": "SUM_3s", "sum": 30000 } ] } \ No newline at end of file diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java index 404a8dc..8e1f77f 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java @@ -36,7 +36,7 @@ HDFS_READER, HYBRID_HASH_JOIN, IN_MEMORY_HASH_JOIN, - IN_MEMORY_STABLE_SORT, + MICRO_STABLE_SORT, INDEX_BULKLOAD, INDEX_INSERT_DELETE, INSERT_DELETE, diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java index 0c08369..81852d4 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java @@ -49,10 +49,12 @@ public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperator { - protected OrderColumn[] sortColumns; - protected ILocalStructuralProperty orderProp; + final int maxNumberOfFrames; + OrderColumn[] sortColumns; + ILocalStructuralProperty orderProp; - public AbstractStableSortPOperator() { + AbstractStableSortPOperator(int maxNumberOfFrames) { + this.maxNumberOfFrames = maxNumberOfFrames; } public OrderColumn[] getSortColumns() { diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java similarity index 93% rename from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java rename to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java index d304421..c6a7d9d 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java @@ -37,14 +37,15 @@ import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -public class InMemoryStableSortPOperator extends AbstractStableSortPOperator { +public class MicroStableSortPOperator extends AbstractStableSortPOperator { - public InMemoryStableSortPOperator() { + public MicroStableSortPOperator(int maxNumberOfFrames) { + super(maxNumberOfFrames); } @Override public PhysicalOperatorTag getOperatorTag() { - return PhysicalOperatorTag.IN_MEMORY_STABLE_SORT; + return PhysicalOperatorTag.MICRO_STABLE_SORT; } @Override @@ -79,7 +80,7 @@ i++; } - IPushRuntimeFactory runtime = new InMemorySortRuntimeFactory(sortFields, nkcf, comps, null); + IPushRuntimeFactory runtime = new InMemorySortRuntimeFactory(sortFields, nkcf, comps, null, maxNumberOfFrames); builder.contributeMicroOperator(op, runtime, recDescriptor); ILogicalOperator src = op.getInputs().get(0).getValue(); builder.contributeGraphEdge(src, 0, op, 0); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java index d13c51a..9567e5b 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java @@ -47,16 +47,14 @@ public class StableSortPOperator extends AbstractStableSortPOperator { - private int maxNumberOfFrames; - private int topK; + private final int topK; public StableSortPOperator(int maxNumberOfFrames) { this(maxNumberOfFrames, -1); } public StableSortPOperator(int maxNumberOfFrames, int topK) { - super(); - this.maxNumberOfFrames = maxNumberOfFrames; + super(maxNumberOfFrames); this.topK = topK; } @@ -86,6 +84,7 @@ IVariableTypeEnvironment env = context.getTypeEnvironment(op); int i = 0; + // TODO(ali): should refactor common code with micro sort op for (OrderColumn oc : sortColumns) { LogicalVariable var = oc.getColumn(); sortFields[i] = opSchema.findVariable(var); diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java index 67c1f1d..a011abf 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java @@ -66,7 +66,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroStableSortPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator; @@ -449,7 +449,7 @@ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); if (op.getOperatorTag() != LogicalOperatorTag.ORDER || (op.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.STABLE_SORT - && op.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.IN_MEMORY_STABLE_SORT) + && op.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.MICRO_STABLE_SORT) || delivered.getLocalProperties() == null) { return false; } @@ -555,7 +555,7 @@ oo.setSourceLocation(sourceLoc); oo.setExecutionMode(AbstractLogicalOperator.ExecutionMode.LOCAL); if (isMicroOp) { - oo.setPhysicalOperator(new InMemoryStableSortPOperator()); + oo.setPhysicalOperator(new MicroStableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort())); } else { oo.setPhysicalOperator(new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort())); } diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java index 5864c3b..344e103 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java @@ -89,7 +89,7 @@ } AbstractPhysicalOperator pOrder1 = (AbstractPhysicalOperator) op2.getPhysicalOperator(); if (pOrder1.getOperatorTag() != PhysicalOperatorTag.STABLE_SORT - && pOrder1.getOperatorTag() != PhysicalOperatorTag.IN_MEMORY_STABLE_SORT) { + && pOrder1.getOperatorTag() != PhysicalOperatorTag.MICRO_STABLE_SORT) { return false; } // StableSortPOperator sort1 = (StableSortPOperator) pOrder1; diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index 49e5a0b..f127898 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -86,7 +86,6 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeleteUpsertPOperator; @@ -94,6 +93,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.LeftOuterUnnestPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreSortedDistinctByPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroStableSortPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroUnionAllPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator; @@ -271,7 +271,7 @@ if (topLevelOp) { return new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort(), oo.getTopK()); } else { - return new InMemoryStableSortPOperator(); + return new MicroStableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()); } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java index 2453029..b34b7b9 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java @@ -19,39 +19,43 @@ package org.apache.hyracks.algebricks.runtime.operators.sort; import java.nio.ByteBuffer; +import java.util.List; import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory; +import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.resources.IDeallocatable; import org.apache.hyracks.api.util.CleanupUtils; +import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy; -import org.apache.hyracks.dataflow.std.buffermanager.FrameFreeSlotPolicyFactory; -import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager; -import org.apache.hyracks.dataflow.std.buffermanager.VariableFrameMemoryManager; -import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool; -import org.apache.hyracks.dataflow.std.sort.FrameSorterMergeSort; +import org.apache.hyracks.dataflow.std.sort.Algorithm; +import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator; +import org.apache.hyracks.dataflow.std.sort.ExternalSortRunMerger; public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory { private static final long serialVersionUID = 1L; - + private final int framesLimit; private final int[] sortFields; private final INormalizedKeyComputerFactory[] keyNormalizerFactories; private final IBinaryComparatorFactory[] comparatorFactories; public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory, - IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) { + IBinaryComparatorFactory[] comparatorFactories, int[] projectionList, int framesLimit) { this(sortFields, firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory } : null, comparatorFactories, - projectionList); + projectionList, framesLimit); } public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, - IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) { + IBinaryComparatorFactory[] comparatorFactories, int[] projectionList, int framesLimit) { super(projectionList); // Obs: the projection list is currently ignored. if (projectionList != null) { @@ -60,52 +64,122 @@ this.sortFields = sortFields; this.keyNormalizerFactories = keyNormalizerFactories; this.comparatorFactories = comparatorFactories; + this.framesLimit = framesLimit; } @Override public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException { - return new AbstractOneInputOneOutputPushRuntime() { - FrameSorterMergeSort frameSorter = null; + InMemorySortPushRuntime pushRuntime = new InMemorySortPushRuntime(ctx); + ctx.registerDeallocatable(pushRuntime); + return pushRuntime; + } - @Override - public void open() throws HyracksDataException { - super.open(); - if (frameSorter == null) { - IFrameBufferManager manager = new VariableFrameMemoryManager( - new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY), - FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT)); - frameSorter = new FrameSorterMergeSort(ctx, manager, VariableFramePool.UNLIMITED_MEMORY, sortFields, - keyNormalizerFactories, comparatorFactories, outputRecordDesc); - } - frameSorter.reset(); + private class InMemorySortPushRuntime extends AbstractOneInputOneOutputPushRuntime implements IDeallocatable { + final IHyracksTaskContext ctx; + ExternalSortRunGenerator runsGenerator = null; + ExternalSortRunMerger runsMerger = null; + IFrameWriter wrappingWriter = null; + + private InMemorySortPushRuntime(IHyracksTaskContext ctx) { + this.ctx = ctx; + } + + @Override + public void open() throws HyracksDataException { + if (runsGenerator == null) { + runsGenerator = new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories, + comparatorFactories, outputRecordDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT, + framesLimit, Integer.MAX_VALUE); } + // next writer will be opened later when preparing the merger + isOpen = true; + runsGenerator.open(); + runsGenerator.getSorter().reset(); + } - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - frameSorter.insertFrame(buffer); - } + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + runsGenerator.nextFrame(buffer); + } - @Override - public void close() throws HyracksDataException { - Throwable failure = null; - if (isOpen) { - try { - if (!failed) { - frameSorter.sort(); - frameSorter.flush(writer); + @Override + public void close() throws HyracksDataException { + Throwable failure = null; + if (isOpen) { + try { + if (!failed) { + runsGenerator.close(); + createOrResetRunsMerger(); + if (runsGenerator.getRuns().isEmpty()) { + wrappingWriter = runsMerger.prepareSkipMergingFinalResultWriter(writer); + wrappingWriter.open(); + if (runsGenerator.getSorter().hasRemaining()) { + runsGenerator.getSorter().flush(wrappingWriter); + } + } else { + wrappingWriter = runsMerger.prepareFinalMergeResultWriter(writer); + wrappingWriter.open(); + runsMerger.process(wrappingWriter); } + } + } catch (Throwable th) { + failure = th; + fail(th); + } finally { + failure = CleanupUtils.close(wrappingWriter, failure); + wrappingWriter = null; + } + } + isOpen = false; + if (failure != null) { + throw HyracksDataException.create(failure); + } + } + + @Override + public void fail() throws HyracksDataException { + failed = true; + // clean up the runs if some have been generated. double close should be idempotent. + if (runsGenerator != null) { + List<GeneratedRunFileReader> runs = runsGenerator.getRuns(); + for (int i = 0, size = runs.size(); i < size; i++) { + try { + runs.get(i).close(); } catch (Throwable th) { - failure = th; - fail(th); - } finally { - failure = CleanupUtils.close(writer, failure); + // ignore } } - if (failure != null) { - throw HyracksDataException.create(failure); + } + if (wrappingWriter != null) { + wrappingWriter.fail(); + } + } + + @Override + public void deallocate() { + if (runsGenerator != null) { + try { + runsGenerator.getSorter().close(); + } catch (Exception e) { + // ignore } } - }; + } + + private void createOrResetRunsMerger() { + if (runsMerger == null) { + IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length]; + for (int i = 0; i < comparatorFactories.length; ++i) { + comparators[i] = comparatorFactories[i].createBinaryComparator(); + } + INormalizedKeyComputer nmkComputer = + keyNormalizerFactories == null ? null : keyNormalizerFactories[0].createNormalizedKeyComputer(); + runsMerger = new ExternalSortRunMerger(ctx, runsGenerator.getRuns(), sortFields, comparators, + nmkComputer, outputRecordDesc, framesLimit, Integer.MAX_VALUE); + } else { + runsMerger.reset(runsGenerator.getRuns()); + } + } } } diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java index 99967c1..e9b3fc3 100644 --- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java +++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java @@ -720,7 +720,7 @@ // the algebricks op. InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 1 }, (INormalizedKeyComputerFactory) null, - new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, null); + new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, null, 50); RecordDescriptor sortDesc = scannerDesc; String fileName = "scanMicroSortWrite.out"; @@ -836,7 +836,7 @@ RecordDescriptor sortDesc = scannerDesc; InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 3 }, (INormalizedKeyComputerFactory) null, - new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, null); + new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, null, 50); // the group-by NestedTupleSourceRuntimeFactory nts = new NestedTupleSourceRuntimeFactory(); diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java index e860288..08b2303 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java @@ -36,6 +36,7 @@ import org.apache.hyracks.dataflow.common.io.RunFileReader; import org.apache.hyracks.dataflow.common.io.RunFileWriter; import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame; +import org.apache.hyracks.util.annotations.CriticalPath; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,10 +49,12 @@ private final IBinaryComparator[] comparators; private final INormalizedKeyComputer nmkComputer; private final RecordDescriptor recordDesc; - private final int framesLimit; + private final int maxMergeWidth; private final int topK; + private List<GeneratedRunFileReader> partialRuns; private List<GroupVSizeFrame> inFrames; private VSizeFrame outputFrame; + private boolean first; private static final Logger LOGGER = LogManager.getLogger(); public AbstractExternalSortRunMerger(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs, @@ -69,16 +72,15 @@ this.comparators = comparators; this.nmkComputer = nmkComputer; this.recordDesc = recordDesc; - this.framesLimit = framesLimit; + this.maxMergeWidth = framesLimit - 1; this.topK = topK; + this.first = true; } + @CriticalPath public void process(IFrameWriter finalWriter) throws HyracksDataException { try { - int maxMergeWidth = framesLimit - 1; - inFrames = new ArrayList<>(maxMergeWidth); - outputFrame = new VSizeFrame(ctx); - List<GeneratedRunFileReader> partialRuns = new ArrayList<>(maxMergeWidth); + createReusableObjects(); int stop = runs.size(); currentGenerationRunAvailable.set(0, stop); int numberOfPasses = 1; @@ -210,6 +212,21 @@ } } + public void reset(List<GeneratedRunFileReader> newRuns) { + this.runs.clear(); + this.runs.addAll(newRuns); + this.currentGenerationRunAvailable.clear(); + } + + private void createReusableObjects() throws HyracksDataException { + if (first) { + first = false; + inFrames = new ArrayList<>(maxMergeWidth); + outputFrame = new VSizeFrame(ctx); + partialRuns = new ArrayList<>(maxMergeWidth); + } + } + public abstract IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException; -- To view, visit https://asterix-gerrit.ics.uci.edu/3353 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-MessageType: merged Gerrit-Change-Id: I930849d644c60d461d2869c9773b85e49b46fbdb Gerrit-Change-Number: 3353 Gerrit-PatchSet: 4 Gerrit-Owner: Ali Alsuliman <ali.al.solai...@gmail.com> Gerrit-Reviewer: Ali Alsuliman <ali.al.solai...@gmail.com> Gerrit-Reviewer: Anon. E. Moose (1000171) Gerrit-Reviewer: Dmitry Lychagin <dmitry.lycha...@couchbase.com> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Michael Blow <mb...@apache.org> Gerrit-Reviewer: Till Westmann <ti...@apache.org>