Taewoo Kim has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1542
Change subject: Let SPLIT operator work as expected ...................................................................... Let SPLIT operator work as expected - Let SPLIT operator function as expected in the optimization framework by referencing the information for the REPLICATE operator Change-Id: I999288ea4cf286e34d735a840843bf161876d3e3 --- M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java 8 files changed, 26 insertions(+), 18 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/42/1542/1 diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java index f883687..852c392 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java @@ -100,4 +100,13 @@ return createPropagatingAllInputsTypeEnvironment(ctx); } + public boolean isBlocker() { + for (boolean requiresMaterialization : outputMaterializationFlags) { + if (requiresMaterialization) { + return true; + } + } + return false; + } + } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java index 2d2fd0f..0499327 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java @@ -42,12 +42,4 @@ return visitor.visitReplicateOperator(this, arg); } - public boolean isBlocker() { - for (boolean requiresMaterialization : outputMaterializationFlags) { - if (requiresMaterialization) { - return true; - } - } - return false; - } } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java index 1a61f2e..2960903 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java @@ -25,14 +25,13 @@ import java.util.Map.Entry; import org.apache.commons.lang3.mutable.Mutable; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.api.job.JobSpecification; @@ -107,8 +106,9 @@ Mutable<ILogicalOperator> child = entry.getKey(); List<Mutable<ILogicalOperator>> parents = entry.getValue(); if (parents.size() > 1) { - if (child.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) { - ReplicateOperator rop = (ReplicateOperator) child.getValue(); + if (child.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE + || child.getValue().getOperatorTag() == LogicalOperatorTag.SPLIT) { + AbstractReplicateOperator rop = (AbstractReplicateOperator) child.getValue(); if (rop.isBlocker()) { // make the order of the graph edges consistent with the order of rop's outputs List<Mutable<ILogicalOperator>> outputs = rop.getOutputs(); diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java index 60275dd..f51c9ea 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java @@ -146,9 +146,10 @@ } } - // TODO: Deal with replicate properly. Currently, we just clear the expr equivalence map, since we want to avoid incorrect expression replacement - // (the resulting new variables should be assigned live below a replicate). - if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE) { + // TODO: Deal with replicate properly. Currently, we just clear the expr equivalence map, + // since we want to avoid incorrect expression replacement + // (the resulting new variables should be assigned live below a replicate/split). + if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE || op.getOperatorTag() == LogicalOperatorTag.SPLIT) { exprEqClassMap.clear(); return modified; } diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java index 474cc73..5a4cacd 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java @@ -459,9 +459,10 @@ private void computeClusters(Mutable<ILogicalOperator> parentRef, Mutable<ILogicalOperator> opRef, MutableInt currentClusterId) { - // only replicate operator has multiple outputs + // only replicate or split operator has multiple outputs int outputIndex = 0; - if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) { + if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE + || opRef.getValue().getOperatorTag() == LogicalOperatorTag.SPLIT) { ReplicateOperator rop = (ReplicateOperator) opRef.getValue(); List<Mutable<ILogicalOperator>> outputs = rop.getOutputs(); for (outputIndex = 0; outputIndex < outputs.size(); outputIndex++) { diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java index 2d57e8d..88c0ea9 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java @@ -87,6 +87,7 @@ || op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE || op2.getOperatorTag() == LogicalOperatorTag.PROJECT || op2.getOperatorTag() == LogicalOperatorTag.REPLICATE + || op2.getOperatorTag() == LogicalOperatorTag.SPLIT || op2.getOperatorTag() == LogicalOperatorTag.UNIONALL) { return new Pair<Boolean, Boolean>(false, false); } diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java index aab6d12..29998c2 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java @@ -58,7 +58,7 @@ LogicalOperatorTag tag2 = op2.getOperatorTag(); if (tag2 == LogicalOperatorTag.INNERJOIN || tag2 == LogicalOperatorTag.LEFTOUTERJOIN - || tag2 == LogicalOperatorTag.REPLICATE) { + || tag2 == LogicalOperatorTag.REPLICATE || tag2 == LogicalOperatorTag.SPLIT) { return false; } else { // not a join boolean res = propagateSelectionRec(opRef, opRef2); diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index 6fdcfdf..e535bb3 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -73,6 +73,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.SplitPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator; @@ -236,6 +237,9 @@ op.setPhysicalOperator(new ReplicatePOperator()); break; } + case SPLIT: { + op.setPhysicalOperator(new SplitPOperator()); + } case SCRIPT: { op.setPhysicalOperator(new StringStreamingScriptPOperator()); break; -- To view, visit https://asterix-gerrit.ics.uci.edu/1542 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I999288ea4cf286e34d735a840843bf161876d3e3 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Taewoo Kim <wangs...@gmail.com>