This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2de0888e96 [multistage] Bug Fixes and Improvements to Physical
Optimizer (#15813)
2de0888e96 is described below
commit 2de0888e96e8d8ae0b322cfdc7ca7ce466b03bce
Author: Ankit Sultana <[email protected]>
AuthorDate: Fri May 16 17:29:37 2025 -0500
[multistage] Bug Fixes and Improvements to Physical Optimizer (#15813)
---
.../planner/physical/v2/ExchangeStrategy.java | 66 ++++++-
.../v2/PlanFragmentAndMailboxAssignment.java | 25 +--
.../planner/physical/v2/RelToPRelConverter.java | 4 +-
.../physical/v2/nodes/PhysicalAggregate.java | 15 +-
.../physical/v2/nodes/PhysicalExchange.java | 29 +--
.../planner/physical/v2/ExchangeStrategyTest.java | 69 +++++++
.../resources/queries/PhysicalOptimizerPlans.json | 205 +++++++++++++--------
7 files changed, 281 insertions(+), 132 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategy.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategy.java
index 82441f47aa..43d7974750 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategy.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategy.java
@@ -18,6 +18,11 @@
*/
package org.apache.pinot.query.planner.physical.v2;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributions;
+
+
/**
* Defines how data is transferred across an Exchange.
*/
@@ -25,17 +30,17 @@ public enum ExchangeStrategy {
/**
* There's a single stream in the receiver, so each stream in the sender
sends data to the same.
*/
- SINGLETON_EXCHANGE,
+ SINGLETON_EXCHANGE(false),
/**
* stream-ID X sends data to stream-ID X. This cannot be modeled by
PARTITIONING_EXCHANGE because the fan-out for
* this type of exchange is 1:1.
*/
- IDENTITY_EXCHANGE,
+ IDENTITY_EXCHANGE(false),
/**
* Each stream will partition the outgoing stream based on a set of keys and
a hash function.
* Fanout for this type of exchange is 1:all.
*/
- PARTITIONING_EXCHANGE,
+ PARTITIONING_EXCHANGE(true),
/**
* stream-ID X will sub-partition: i.e. divide the stream so that the data
is sent to the streams
* {@code X, X + F, X + 2*F, ...}. Here F is the sub-partitioning factor.
Records are assigned based on a
@@ -43,25 +48,70 @@ public enum ExchangeStrategy {
* partition counts divides the other.
* <b>Note:</b> This is different and better than partitioning exchange
because the fanout is F, and not N * (N*F).
*/
- SUB_PARTITIONING_HASH_EXCHANGE,
+ SUB_PARTITIONING_HASH_EXCHANGE(true),
/**
* Same as above but records are sub-partitioned in a round-robin way. This
will increase parallelism but lose
* data partitioning.
*/
- SUB_PARTITIONING_RR_EXCHANGE,
+ SUB_PARTITIONING_RR_EXCHANGE(false),
/**
* Similar to sub-partitioning, except it does the inverse and merges
partitions. Partitions are merged in a way
* that we still preserve partitions, but only change the partition count.
i.e. if current partition count is 16,
* and we want 8 partitions, then partition-0 in the receiver will receive
data from partition-0 and partition-8
* in the sender.
*/
- COALESCING_PARTITIONING_EXCHANGE,
+ COALESCING_PARTITIONING_EXCHANGE(true),
/**
* Each stream will send data to all receiving streams.
*/
- BROADCAST_EXCHANGE,
+ BROADCAST_EXCHANGE(false),
/**
* Records are sent randomly from a given worker in the sender to some
worker in the receiver.
*/
- RANDOM_EXCHANGE
+ RANDOM_EXCHANGE(false);
+
+ /**
+ * This is true when the Exchange Strategy is such that it requires a
List<Integer> representing the
+ * distribution keys. The list must be non-empty.
+ */
+ private final boolean _requireKeys;
+
+ /**
+ * See {@link #_requireKeys}.
+ */
+ public boolean isRequireKeys() {
+ return _requireKeys;
+ }
+
+ ExchangeStrategy(boolean requireKeys) {
+ _requireKeys = requireKeys;
+ }
+
+ public static RelDistribution getRelDistribution(ExchangeStrategy
exchangeStrategy, List<Integer> keys) {
+ if (exchangeStrategy.isRequireKeys() && keys.isEmpty()) {
+ throw new IllegalStateException(String.format("ExchangeStrategy=%s
requires distribution keys, but none found",
+ exchangeStrategy));
+ } else if (!exchangeStrategy.isRequireKeys() && !keys.isEmpty()) {
+ throw new IllegalStateException(String.format(
+ "ExchangeStrategy=%s does not require distribution keys but found
%s", exchangeStrategy, keys));
+ }
+ switch (exchangeStrategy) {
+ case PARTITIONING_EXCHANGE:
+ case SUB_PARTITIONING_HASH_EXCHANGE:
+ case COALESCING_PARTITIONING_EXCHANGE:
+ return RelDistributions.hash(keys);
+ case IDENTITY_EXCHANGE:
+ return RelDistributions.hash(List.of());
+ case BROADCAST_EXCHANGE:
+ return RelDistributions.BROADCAST_DISTRIBUTED;
+ case SINGLETON_EXCHANGE:
+ return RelDistributions.SINGLETON;
+ case SUB_PARTITIONING_RR_EXCHANGE:
+ return RelDistributions.ROUND_ROBIN_DISTRIBUTED;
+ case RANDOM_EXCHANGE:
+ return RelDistributions.RANDOM_DISTRIBUTED;
+ default:
+ throw new IllegalStateException(String.format("Unexpected exchange
strategy: %s", exchangeStrategy));
+ }
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
index a7733b1a28..88761f0bcf 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
@@ -116,7 +116,8 @@ public class PlanFragmentAndMailboxAssignment {
int senderFragmentId = context._planFragmentMap.size();
final DataSchema inputFragmentSchema =
PRelToPlanNodeConverter.toDataSchema(
pRelNode.getPRelInput(0).unwrap().getRowType());
- RelDistribution.Type distributionType =
inferDistributionType(physicalExchange.getExchangeStrategy());
+ RelDistribution.Type distributionType =
ExchangeStrategy.getRelDistribution(
+ physicalExchange.getExchangeStrategy(),
physicalExchange.getDistributionKeys()).getType();
List<PlanNode> inputs = new ArrayList<>();
MailboxSendNode sendNode = new MailboxSendNode(senderFragmentId,
inputFragmentSchema, inputs, currentFragmentId,
PinotRelExchangeType.getDefaultExchangeType(), distributionType,
physicalExchange.getDistributionKeys(),
@@ -156,27 +157,6 @@ public class PlanFragmentAndMailboxAssignment {
return fragment;
}
- private RelDistribution.Type inferDistributionType(ExchangeStrategy desc) {
- RelDistribution.Type distributionType;
- switch (desc) {
- case PARTITIONING_EXCHANGE:
- distributionType = RelDistribution.Type.HASH_DISTRIBUTED;
- break;
- case IDENTITY_EXCHANGE:
- distributionType = RelDistribution.Type.HASH_DISTRIBUTED;
- break;
- case SINGLETON_EXCHANGE:
- distributionType = RelDistribution.Type.SINGLETON;
- break;
- case BROADCAST_EXCHANGE:
- distributionType = RelDistribution.Type.BROADCAST_DISTRIBUTED;
- break;
- default:
- throw new IllegalStateException("");
- }
- return distributionType;
- }
-
private Map<Integer, QueryServerInstance> createWorkerMap(List<String>
workers, Context context) {
Map<Integer, QueryServerInstance> mp = new HashMap<>();
for (String instance : workers) {
@@ -225,6 +205,7 @@ public class PlanFragmentAndMailboxAssignment {
.put(senderStageId, new
SharedMailboxInfos(mailboxInfoListForReceiver));
break;
}
+ case RANDOM_EXCHANGE:
case PARTITIONING_EXCHANGE:
case BROADCAST_EXCHANGE: {
MailboxInfos mailboxInfoListForSender = new
MailboxInfos(createMailboxInfo(receiverWorkers));
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
index 67a24c75fe..f3cd36452e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
@@ -94,9 +94,11 @@ public class RelToPRelConverter {
} else if (relNode instanceof PinotLogicalAggregate) {
Preconditions.checkState(inputs.size() == 1, "Expected exactly 1 input
of agg. Found: %s", inputs);
PinotLogicalAggregate aggRel = (PinotLogicalAggregate) relNode;
+ // Use AggType.DIRECT here because at this point aggregation split
hasn't happened yet.
+ AggregateNode.AggType aggType = AggregateNode.AggType.DIRECT;
return new PhysicalAggregate(aggRel.getCluster(), aggRel.getTraitSet(),
aggRel.getHints(), aggRel.getGroupSet(),
aggRel.getGroupSets(), aggRel.getAggCallList(),
nodeIdGenerator.get(), inputs.get(0), null, false,
- AggregateNode.AggType.DIRECT, false, List.of(), 0);
+ aggType, aggRel.isLeafReturnFinalResult(), aggRel.getCollations(),
aggRel.getLimit());
} else if (relNode instanceof Join) {
Preconditions.checkState(relNode.getInputs().size() == 2, "Expected
exactly 2 inputs to join. Found: %s", inputs);
Join join = (Join) relNode;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java
index 1306c8d794..149795eb46 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java
@@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.hint.RelHint;
@@ -51,7 +52,7 @@ public class PhysicalAggregate extends Aggregate implements
PRelNode {
public PhysicalAggregate(RelOptCluster cluster, RelTraitSet traitSet,
List<RelHint> hints,
ImmutableBitSet groupSet, @Nullable List<ImmutableBitSet> groupSets,
List<AggregateCall> aggCalls,
int nodeId, PRelNode pRelInput, @Nullable PinotDataDistribution
pinotDataDistribution, boolean leafStage,
- AggregateNode.AggType aggType, boolean leafReturnFinalResult,
List<RelFieldCollation> collations,
+ AggregateNode.AggType aggType, boolean leafReturnFinalResult, @Nullable
List<RelFieldCollation> collations,
int limit) {
super(cluster, traitSet, hints, pRelInput.unwrap(), groupSet, groupSets,
aggCalls);
_nodeId = nodeId;
@@ -60,7 +61,7 @@ public class PhysicalAggregate extends Aggregate implements
PRelNode {
_leafStage = leafStage;
_aggType = aggType;
_leafReturnFinalResult = leafReturnFinalResult;
- _collations = collations;
+ _collations = collations == null ? List.of() : collations;
_limit = limit;
}
@@ -114,6 +115,16 @@ public class PhysicalAggregate extends Aggregate
implements PRelNode {
_collations, _limit);
}
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ RelWriter relWriter = super.explainTerms(pw);
+ relWriter.item("aggType", _aggType);
+ relWriter.itemIf("leafReturnFinalResult", true, _leafReturnFinalResult);
+ relWriter.itemIf("collations", _collations, !_collations.isEmpty());
+ relWriter.itemIf("limit", _limit, _limit > 0);
+ return relWriter;
+ }
+
public AggregateNode.AggType getAggType() {
return _aggType;
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
index 66846faff7..b00d786f00 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
@@ -26,10 +26,10 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Exchange;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTraitDef;
@@ -81,7 +81,7 @@ public class PhysicalExchange extends Exchange implements
PRelNode {
List<Integer> distributionKeys, ExchangeStrategy exchangeStrategy,
@Nullable RelCollation relCollation,
PinotExecStrategyTrait execStrategyTrait) {
super(input.unwrap().getCluster(),
EMPTY_TRAIT_SET.plus(execStrategyTrait), input.unwrap(),
- getRelDistribution(exchangeStrategy, distributionKeys));
+ ExchangeStrategy.getRelDistribution(exchangeStrategy,
distributionKeys));
_nodeId = nodeId;
_pRelInputs = Collections.singletonList(input);
_pinotDataDistribution = pinotDataDistribution;
@@ -161,26 +161,9 @@ public class PhysicalExchange extends Exchange implements
PRelNode {
@Override public RelWriter explainTerms(RelWriter pw) {
return pw.item("input", input)
.item("exchangeStrategy", _exchangeStrategy)
- .item("distKeys", _distributionKeys)
- .item("execStrategy", getRelExchangeType())
- .item("collation", _relCollation);
- }
-
- private static RelDistribution getRelDistribution(ExchangeStrategy
exchangeStrategy, List<Integer> keys) {
- switch (exchangeStrategy) {
- case IDENTITY_EXCHANGE:
- case PARTITIONING_EXCHANGE:
- case SUB_PARTITIONING_HASH_EXCHANGE:
- case COALESCING_PARTITIONING_EXCHANGE:
- return RelDistributions.hash(keys);
- case BROADCAST_EXCHANGE:
- return RelDistributions.BROADCAST_DISTRIBUTED;
- case SINGLETON_EXCHANGE:
- return RelDistributions.SINGLETON;
- case SUB_PARTITIONING_RR_EXCHANGE:
- return RelDistributions.ROUND_ROBIN_DISTRIBUTED;
- default:
- throw new IllegalStateException(String.format("Unexpected exchange
strategy: %s", exchangeStrategy));
- }
+ .itemIf("distKeys", _distributionKeys,
CollectionUtils.isNotEmpty(_distributionKeys))
+ .itemIf("execStrategy", getRelExchangeType(),
+ getRelExchangeType() !=
PinotRelExchangeType.getDefaultExchangeType())
+ .itemIf("collation", _relCollation,
CollectionUtils.isNotEmpty(_relCollation.getFieldCollations()));
}
}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategyTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategyTest.java
new file mode 100644
index 0000000000..b0bc6ea11c
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategyTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2;
+
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class ExchangeStrategyTest {
+ @Test
+ public void testGetRelDistribution() {
+ // Ensure each exchange strategy can be mapped to a RelDistribution.
+ for (ExchangeStrategy exchangeStrategy : ExchangeStrategy.values()) {
+ List<Integer> keys = exchangeStrategy.isRequireKeys() ? List.of(1) :
List.of();
+ assertNotNull(ExchangeStrategy.getRelDistribution(exchangeStrategy,
keys));
+ }
+ // Manual check for each exchange strategy.
+ // Start with hash types
+ List<ExchangeStrategy> hashExchangeStrategies =
List.of(ExchangeStrategy.PARTITIONING_EXCHANGE,
+ ExchangeStrategy.SUB_PARTITIONING_HASH_EXCHANGE,
ExchangeStrategy.COALESCING_PARTITIONING_EXCHANGE);
+ for (ExchangeStrategy exchangeStrategy : hashExchangeStrategies) {
+ assertEquals(ExchangeStrategy.getRelDistribution(exchangeStrategy,
List.of(1)).getType(),
+ RelDistribution.Type.HASH_DISTRIBUTED);
+ }
+ // Singleton exchange
+
assertEquals(ExchangeStrategy.getRelDistribution(ExchangeStrategy.SINGLETON_EXCHANGE,
List.of()).getType(),
+ RelDistribution.Type.SINGLETON);
+ // Random exchange
+
assertEquals(ExchangeStrategy.getRelDistribution(ExchangeStrategy.RANDOM_EXCHANGE,
List.of()).getType(),
+ RelDistribution.Type.RANDOM_DISTRIBUTED);
+ // Broadcast exchange
+
assertEquals(ExchangeStrategy.getRelDistribution(ExchangeStrategy.BROADCAST_EXCHANGE,
List.of()).getType(),
+ RelDistribution.Type.BROADCAST_DISTRIBUTED);
+ // Sub-partitioning round robin exchange
+
assertEquals(ExchangeStrategy.getRelDistribution(ExchangeStrategy.SUB_PARTITIONING_RR_EXCHANGE,
+ List.of()).getType(), RelDistribution.Type.ROUND_ROBIN_DISTRIBUTED);
+ }
+
+ @Test
+ public void testGetRelDistributionInvalid() {
+ // Test case when empty keys list but strategy requires keys
+ assertThrows(IllegalStateException.class, () -> {
+
ExchangeStrategy.getRelDistribution(ExchangeStrategy.PARTITIONING_EXCHANGE,
List.of());
+ });
+ // Test case when non-empty keys list but strategy does not accept keys
+ assertThrows(IllegalStateException.class, () -> {
+ ExchangeStrategy.getRelDistribution(ExchangeStrategy.IDENTITY_EXCHANGE,
List.of(1));
+ });
+ }
+}
diff --git
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index af9048bec3..acab86c39c 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -7,13 +7,13 @@
"output": [
"Execution Plan",
"\nPhysicalSort(sort0=[$0], dir0=[ASC])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalProject(col1=[$0], ts=[$1], col3=[$3])",
"\n PhysicalJoin(condition=[=($0, $2)], joinType=[inner])",
- "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
"\n PhysicalProject(col1=[$0], ts=[$7])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
"\n PhysicalProject(col2=[$1], col3=[$2])",
"\n PhysicalTableScan(table=[[default, b]])",
"\n"
@@ -25,13 +25,13 @@
"output": [
"Execution Plan",
"\nPhysicalSort(sort0=[$0], dir0=[ASC])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalProject(value1=[$0], ts1=[$1], col3=[$3])",
"\n PhysicalJoin(condition=[=($0, $2)], joinType=[inner])",
- "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
"\n PhysicalProject(col1=[$0], ts=[$7])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
"\n PhysicalProject(col2=[$1], col3=[$2])",
"\n PhysicalTableScan(table=[[default, b]])",
"\n"
@@ -43,9 +43,9 @@
"output": [
"Execution Plan",
"\nPhysicalJoin(condition=[=($0, $10)], joinType=[inner])",
- "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[1]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[1]])",
"\n PhysicalTableScan(table=[[default, b]])",
"\n"
]
@@ -56,10 +56,10 @@
"output": [
"Execution Plan",
"\nPhysicalJoin(condition=[=($0, $10)], joinType=[inner])",
- "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]])",
"\n PhysicalFilter(condition=[>=($2, 0)])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[1]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[1]])",
"\n PhysicalTableScan(table=[[default, b]])",
"\n"
]
@@ -70,10 +70,10 @@
"output": [
"Execution Plan",
"\nPhysicalJoin(condition=[AND(=($0, $10), >($2, $11))],
joinType=[inner])",
- "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalFilter(condition=[>=($2, 0)])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n PhysicalExchange(exchangeStrategy=[BROADCAST_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[BROADCAST_EXCHANGE])",
"\n PhysicalTableScan(table=[[default, b]])",
"\n"
]
@@ -84,9 +84,9 @@
"output": [
"Execution Plan",
"\nPhysicalJoin(condition=[AND(=($0, $9), =($1, $10))],
joinType=[inner])",
- "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0, 1]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0, 1]])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0, 1]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0, 1]])",
"\n PhysicalTableScan(table=[[default, b]])",
"\n"
]
@@ -102,10 +102,10 @@
"Execution Plan",
"\nPhysicalProject(col1=[$0], col2=[$1])",
"\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
- "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[2]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[2]])",
"\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]])",
"\n PhysicalProject(col3=[$2])",
"\n PhysicalTableScan(table=[[default, b]])",
"\n"
@@ -118,10 +118,10 @@
"Execution Plan",
"\nPhysicalProject(col1=[$0], col2=[$1])",
"\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
- "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n PhysicalTableScan(table=[[default, b]])",
- "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalProject(col3=[$2])",
"\n PhysicalTableScan(table=[[default, b]])",
"\n"
@@ -136,18 +136,18 @@
"\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
"\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
"\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
- "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])",
"\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
"\n PhysicalProject(col3=[$2])",
"\n PhysicalFilter(condition=[=($1, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, b]])",
- "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]])",
"\n PhysicalProject(col3=[$2])",
"\n PhysicalFilter(condition=[=($1, _UTF-8'bar')])",
"\n PhysicalTableScan(table=[[default, b]])",
- "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]])",
"\n PhysicalProject(col3=[$2])",
"\n PhysicalFilter(condition=[=($1, _UTF-8'lorem')])",
"\n PhysicalTableScan(table=[[default, b]])",
@@ -166,19 +166,19 @@
"\n PhysicalJoin(condition=[=($3, $4)], joinType=[left])",
"\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2],
col31=[$2])",
"\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
- "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])",
"\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
"\n PhysicalProject(col3=[$2])",
"\n PhysicalFilter(condition=[=($1, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, b]])",
- "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]],
execStrategy=[STREAMING], collation=[[]])",
- "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+ "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)],
aggType=[DIRECT])",
"\n PhysicalProject(col3=[$2], $f1=[true])",
"\n PhysicalFilter(condition=[=($1, _UTF-8'bar')])",
"\n PhysicalTableScan(table=[[default, b]])",
- "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]])",
"\n PhysicalProject(col3=[$2])",
"\n PhysicalFilter(condition=[=($1, _UTF-8'lorem')])",
"\n PhysicalTableScan(table=[[default, b]])",
@@ -197,20 +197,20 @@
"\n PhysicalJoin(condition=[=($3, $4)], joinType=[left])",
"\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2],
col31=[$2])",
"\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
- "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])",
"\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
"\n PhysicalProject(col3=[$2])",
"\n PhysicalFilter(condition=[=($1, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, b]])",
- "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])",
- "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]],
execStrategy=[STREAMING], collation=[[]])",
- "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])",
+ "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)],
aggType=[FINAL])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+ "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)],
aggType=[LEAF])",
"\n PhysicalProject(col3=[$2], $f1=[true])",
"\n PhysicalFilter(condition=[=($1, _UTF-8'bar')])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]])",
"\n PhysicalProject(col3=[$2])",
"\n PhysicalFilter(condition=[=($1, _UTF-8'lorem')])",
"\n PhysicalTableScan(table=[[default, b]])",
@@ -228,14 +228,14 @@
"Execution Plan",
"\nPhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
"\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
- "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalProject(col1=[$0], col2=[$1])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalProject(col2=[$1])",
"\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'foo'):INTEGER
NOT NULL)])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalProject(col2=[$1])",
"\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER
NOT NULL)])",
"\n PhysicalTableScan(table=[[default, a]])",
@@ -253,19 +253,19 @@
"\n PhysicalJoin(condition=[=($2, $3)], joinType=[left])",
"\n PhysicalProject(col1=[$0], col2=[$1], col21=[$1])",
"\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
- "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalProject(col1=[$0], col2=[$1])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalProject(col2=[$1])",
"\n PhysicalFilter(condition=[=($2,
CAST(_UTF-8'foo'):INTEGER NOT NULL)])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
- "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])",
+ "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+ "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)],
aggType=[DIRECT])",
"\n PhysicalProject(col2=[$1], $f1=[true])",
"\n PhysicalFilter(condition=[=($2,
CAST(_UTF-8'bar'):INTEGER NOT NULL)])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalProject(col2=[$1])",
"\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'lorem'):INTEGER
NOT NULL)])",
"\n PhysicalTableScan(table=[[default, a]])",
@@ -277,28 +277,28 @@
"sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1,
COUNT(*) FROM a WHERE col2 IN (SELECT col2 FROM a WHERE col3 = 'foo') AND col2
NOT IN (SELECT col2 FROM a WHERE col3 = 'bar') AND col2 IN (SELECT col2 FROM a
WHERE col3 = 'lorem') GROUP BY col1",
"output": [
"Execution Plan",
- "\nPhysicalAggregate(group=[{0}], agg#0=[COUNT($1)])",
- "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
- "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT()])",
+ "\nPhysicalAggregate(group=[{0}], agg#0=[COUNT($1)],
aggType=[FINAL])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]])",
+ "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT()],
aggType=[LEAF])",
"\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
"\n PhysicalProject(col1=[$0], col2=[$1])",
"\n PhysicalFilter(condition=[IS NOT TRUE($4)])",
"\n PhysicalJoin(condition=[=($2, $3)], joinType=[left])",
"\n PhysicalProject(col1=[$0], col2=[$1], col21=[$1])",
"\n PhysicalJoin(condition=[=($1, $2)],
joinType=[semi])",
- "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalProject(col1=[$0], col2=[$1])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalProject(col2=[$1])",
"\n PhysicalFilter(condition=[=($2,
CAST(_UTF-8'foo'):INTEGER NOT NULL)])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]],
execStrategy=[STREAMING], collation=[[]])",
- "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])",
+ "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+ "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)],
aggType=[DIRECT])",
"\n PhysicalProject(col2=[$1], $f1=[true])",
"\n PhysicalFilter(condition=[=($2,
CAST(_UTF-8'bar'):INTEGER NOT NULL)])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalProject(col2=[$1])",
"\n PhysicalFilter(condition=[=($2,
CAST(_UTF-8'lorem'):INTEGER NOT NULL)])",
"\n PhysicalTableScan(table=[[default, a]])",
@@ -307,6 +307,59 @@
}
]
},
+ "physical_opt_group_trim_enabled": {
+ "queries": [
+ {
+ "description": "SQL hint based group by optimization with partitioned
aggregated values and group trim enabled",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+
aggOptions(is_leaf_return_final_result='true', is_enable_group_trim='true') */
col1, COUNT(DISTINCT col2) AS cnt FROM a WHERE col3 >= 0 GROUP BY col1 ORDER BY
cnt DESC LIMIT 10",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalSort(sort0=[$1], dir0=[DESC], fetch=[10])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalSort(sort0=[$1], dir0=[DESC], fetch=[10])",
+ "\n PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)],
aggType=[FINAL], leafReturnFinalResult=[true], limit=[10])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+ "\n PhysicalAggregate(group=[{0}],
agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], leafReturnFinalResult=[true],
collations=[[1 DESC]], limit=[10])",
+ "\n PhysicalFilter(condition=[>=($2, 0)])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "SQL hint based group by optimization with group trim
enabled without returning group key",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+
aggOptions(is_enable_group_trim='true') */ COUNT(DISTINCT col2) AS cnt FROM a
WHERE a.col3 >= 0 GROUP BY col1 ORDER BY cnt DESC LIMIT 10",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalSort(sort0=[$0], dir0=[DESC], fetch=[10])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalSort(sort0=[$0], dir0=[DESC], fetch=[10])",
+ "\n PhysicalProject(cnt=[$1])",
+ "\n PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)],
aggType=[FINAL], limit=[10])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+ "\n PhysicalAggregate(group=[{0}],
agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], collations=[[1 DESC]], limit=[10])",
+ "\n PhysicalFilter(condition=[>=($2, 0)])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "SQL hint based distinct optimization with group trim
enabled",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+
aggOptions(is_enable_group_trim='true') */ DISTINCT col1, col2 FROM a WHERE
col3 >= 0 LIMIT 10",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalSort(fetch=[10])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalSort(fetch=[10])",
+ "\n PhysicalAggregate(group=[{0, 1}], aggType=[FINAL],
limit=[10])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])",
+ "\n PhysicalAggregate(group=[{0, 1}], aggType=[LEAF],
limit=[10])",
+ "\n PhysicalFilter(condition=[>=($2, 0)])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ }
+ ]
+ },
"physical_opt_misc_auto_identity": {
"queries": [
{
@@ -315,20 +368,20 @@
"output": [
"Execution Plan",
"\nPhysicalProject(EXPR$0=[$1], col3=[$0])",
- "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT($1)])",
- "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
- "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()])",
+ "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT($1)],
aggType=[FINAL])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]])",
+ "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()],
aggType=[LEAF])",
"\n PhysicalJoin(condition=[=($0, $2)], joinType=[semi])",
- "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalProject(col2=[$1], col3=[$2])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n PhysicalAggregate(group=[{0}])",
+ "\n PhysicalAggregate(group=[{0}], aggType=[DIRECT])",
"\n PhysicalUnion(all=[true])",
- "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalProject(col2=[$1])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]],
execStrategy=[STREAMING], collation=[[]])",
+ "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalProject(col2=[$1])",
"\n PhysicalFilter(condition=[=($2,
CAST(_UTF-8'bar'):INTEGER NOT NULL)])",
"\n PhysicalTableScan(table=[[default, a]])",
@@ -345,7 +398,7 @@
"output": [
"Execution Plan",
"\nPhysicalSort(sort0=[$0], dir0=[ASC])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalProject(col2=[$1], col3=[$2])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
@@ -358,7 +411,7 @@
"output": [
"Execution Plan",
"\nPhysicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
"\n PhysicalProject(col2=[$1], col3=[$2])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
@@ -372,7 +425,7 @@
"output": [
"Execution Plan",
"\nPhysicalSort(sort0=[$0], dir0=[ASC], offset=[10], fetch=[11])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[21])",
"\n PhysicalProject(col2=[$1], col3=[$2])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
@@ -386,7 +439,7 @@
"output": [
"Execution Plan",
"\nPhysicalSort(offset=[10], fetch=[11])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalSort(fetch=[21])",
"\n PhysicalProject(col2=[$1], col3=[$2])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
@@ -400,9 +453,9 @@
"output": [
"Execution Plan",
"\nPhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
- "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()])",
+ "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()],
aggType=[DIRECT])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
@@ -413,9 +466,9 @@
"output": [
"Execution Plan",
"\nPhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
- "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
"\n PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
- "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()])",
+ "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()],
aggType=[DIRECT])",
"\n PhysicalTableScan(table=[[default, b]])",
"\n"
]
@@ -441,7 +494,7 @@
"sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN
PLAN FOR SELECT col2, COUNT(*) FROM a WHERE col1 = 'foo' GROUP BY col2",
"output": [
"Execution Plan",
- "\nPhysicalAggregate(group=[{1}], agg#0=[COUNT()])",
+ "\nPhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[DIRECT],
limit=[100000])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
@@ -455,9 +508,9 @@
"\nPhysicalProject(rnk=[$3], col1=[$0])",
"\n PhysicalFilter(condition=[=($3, 1)])",
"\n PhysicalWindow(window#0=[window(partition {1} order by [2]
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n PhysicalAggregate(group=[{0, 1, 2}])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
- "\n PhysicalAggregate(group=[{0, 1, 2}])",
+ "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[LEAF],
limit=[100000])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
@@ -469,9 +522,9 @@
"output": [
"Execution Plan",
"\nPhysicalSort(offset=[100], fetch=[400])",
- "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
- "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()])",
+ "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)],
aggType=[FINAL])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()],
aggType=[LEAF], limit=[100000])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
@@ -487,12 +540,12 @@
"output": [
"Execution Plan",
"\nPhysicalJoin(condition=[=($0, $2)], joinType=[semi])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalSort(fetch=[100000])",
"\n PhysicalProject(col2=[$1], col3=[$2])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalSort(fetch=[100000])",
"\n PhysicalProject(col1=[$0])",
"\n PhysicalTableScan(table=[[default, b]])",
@@ -505,14 +558,14 @@
"output": [
"Execution Plan",
"\nPhysicalProject(EXPR$0=[$1], col2=[$0])",
- "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT()])",
+ "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT()],
aggType=[DIRECT])",
"\n PhysicalJoin(condition=[=($0, $1)], joinType=[semi])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalSort(fetch=[100000])",
"\n PhysicalProject(col2=[$1])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalSort(fetch=[100000])",
"\n PhysicalProject(col1=[$0])",
"\n PhysicalTableScan(table=[[default, b]])",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]