Taewoo Kim has submitted this change and it was merged. Change subject: Index-only plan step 2: Added SplitOperator ......................................................................
Index-only plan step 2: Added SplitOperator - Introduced SplitOperator that sends a tuple to only one output frame unlike the ReplicateOperator that propagates a tuple into all outputs frames. - Removed PartitioningSplitOperator and PartitioningSplitOperatorDescriptor that are not functional (lacking physical operator) - Added a unit test case of SplitOperatorDescriptor in PushRuntimeTest. Change-Id: Ice190827513cd8632764b52c9d0338d65c830740 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1196 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Yingyi Bu <buyin...@gmail.com> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java A hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java D hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/PartitioningSplitOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java A hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java A hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java A hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java D hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java A hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java A hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl A hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl A hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl M hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java R hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java R hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java 37 files changed, 778 insertions(+), 628 deletions(-) Approvals: Yingyi Bu: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java index e04be6f..4a79387 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java @@ -44,19 +44,19 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -197,15 +197,12 @@ } @Override - public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException { - for (Mutable<ILogicalExpression> expr : op.getExpressions()) { - sweepExpression(expr.getValue(), op); - } + public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { return null; } @Override - public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { + public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException { return null; } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java index 143cf0b..874cc7c 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java @@ -26,8 +26,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; +import java.util.Set; import org.apache.asterix.lang.common.util.FunctionUtil; import org.apache.asterix.om.base.AString; @@ -67,18 +67,18 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; @@ -508,13 +508,12 @@ } @Override - public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) - throws AlgebricksException { + public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { return visitSingleInputOperator(op); } @Override - public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { + public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException { return visitSingleInputOperator(op); } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java index c7b927e..eeb2c2a 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java @@ -53,12 +53,12 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -276,13 +276,12 @@ } @Override - public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) - throws AlgebricksException { + public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { return visitSingleInputOperator(op); } @Override - public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { + public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException { return visitSingleInputOperator(op); } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java index ef0f9da..ccf0aeb 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java @@ -33,17 +33,17 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -153,12 +153,12 @@ } @Override - public Boolean visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException { - return false; + public Boolean visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { + return visitInputs(op); } @Override - public Boolean visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { + public Boolean visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException { return visitInputs(op); } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java index d39a8c8..cc7a75f 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java @@ -38,13 +38,13 @@ MATERIALIZE, NESTEDTUPLESOURCE, ORDER, - PARTITIONINGSPLIT, PROJECT, REPLICATE, RUNNINGAGGREGATE, SCRIPT, SELECT, SINK, + SPLIT, SUBPLAN, TOKENIZE, UNIONALL, diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java index 893fb32..1d20e08 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java @@ -46,13 +46,13 @@ NESTED_LOOP, NESTED_TUPLE_SOURCE, ONE_TO_ONE_EXCHANGE, - PARTITIONINGSPLIT, PRE_CLUSTERED_GROUP_BY, PRE_SORTED_DISTINCT_BY, RANDOM_PARTITION_EXCHANGE, RANDOM_MERGE_EXCHANGE, RANGE_PARTITION_EXCHANGE, RANGE_PARTITION_MERGE_EXCHANGE, + REPLICATE, RTREE_SEARCH, RUNNING_AGGREGATE, SINGLE_PARTITION_INVERTED_INDEX_SEARCH, @@ -60,7 +60,7 @@ SINK_WRITE, SORT_GROUP_BY, SORT_MERGE_EXCHANGE, - REPLICATE, + SPLIT, STABLE_SORT, STATS, STREAM_LIMIT, diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java new file mode 100644 index 0000000..f883687 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.core.algebra.operators.logical; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy; +import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; + +/** + * Abstract class for two replication related operator - replicate and split + * Replicate operator propagates all frames to all output branches. + * That is, each tuple will be propagated to all output branches. + * Split operator propagates each tuple in a frame to one output branch only. + */ +public abstract class AbstractReplicateOperator extends AbstractLogicalOperator { + + private int outputArity; + protected boolean[] outputMaterializationFlags; + private List<Mutable<ILogicalOperator>> outputs; + + public AbstractReplicateOperator(int outputArity) { + this.outputArity = outputArity; + this.outputMaterializationFlags = new boolean[outputArity]; + this.outputs = new ArrayList<>(); + } + + public AbstractReplicateOperator(int outputArity, boolean[] outputMaterializationFlags) { + this.outputArity = outputArity; + this.outputMaterializationFlags = outputMaterializationFlags; + this.outputs = new ArrayList<>(); + } + + @Override + public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) + throws AlgebricksException { + return false; + } + + @Override + public VariablePropagationPolicy getVariablePropagationPolicy() { + return VariablePropagationPolicy.ALL; + } + + @Override + public boolean isMap() { + return true; + } + + @Override + public void recomputeSchema() { + schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema()); + } + + public void substituteVar(LogicalVariable v1, LogicalVariable v2) { + // do nothing + } + + public int getOutputArity() { + return outputArity; + } + + public void setOutputMaterializationFlags(boolean[] outputMaterializationFlags) { + this.outputMaterializationFlags = outputMaterializationFlags; + } + + public boolean[] getOutputMaterializationFlags() { + return outputMaterializationFlags; + } + + public List<Mutable<ILogicalOperator>> getOutputs() { + return outputs; + } + + @Override + public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException { + return createPropagatingAllInputsTypeEnvironment(ctx); + } + +} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/PartitioningSplitOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/PartitioningSplitOperator.java deleted file mode 100644 index 1c5f324..0000000 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/PartitioningSplitOperator.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.core.algebra.operators.logical; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang3.mutable.Mutable; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; -import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; -import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; -import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; -import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy; -import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; -import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; -import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; -import org.apache.hyracks.algebricks.runtime.operators.std.PartitioningSplitOperatorDescriptor; - -/** - * Partitions it's input based on a given list of expressions. - * Each expression is assumed to return true/false, - * and there is exactly one output branch per expression (optionally, plus one default branch). - * For each input tuple, the expressions are evaluated one-by-one, - * and the tuple is written to first output branch whose corresponding - * expression evaluates to true. - * If all expressions evaluate to false, then - * the tuple is written to the default output branch, if any was given. - * If no output branch was given, then such tuples are simply dropped. - * Given N expressions there may be N or N+1 output branches because the default output branch may be separate from the regular output branches. - */ -public class PartitioningSplitOperator extends AbstractLogicalOperator { - - private final List<Mutable<ILogicalExpression>> expressions; - private final int defaultBranchIndex; - - public PartitioningSplitOperator(List<Mutable<ILogicalExpression>> expressions, int defaultBranchIndex) throws AlgebricksException { - this.expressions = expressions; - this.defaultBranchIndex = defaultBranchIndex; - // Check that the default output branch index is in [0, N], where N is the number of expressions. - if (defaultBranchIndex != PartitioningSplitOperatorDescriptor.NO_DEFAULT_BRANCH - && defaultBranchIndex > expressions.size()) { - throw new AlgebricksException("Default branch index out of bounds. Number of exprs given: " - + expressions.size() + ". The maximum default branch index may therefore be: " + expressions.size()); - } - } - - public List<Mutable<ILogicalExpression>> getExpressions() { - return expressions; - } - - public int getDefaultBranchIndex() { - return defaultBranchIndex; - } - - public int getNumOutputBranches() { - return (defaultBranchIndex == expressions.size()) ? expressions.size() + 1 : expressions.size(); - } - - @Override - public LogicalOperatorTag getOperatorTag() { - return LogicalOperatorTag.PARTITIONINGSPLIT; - } - - @Override - public void recomputeSchema() { - schema = new ArrayList<LogicalVariable>(); - schema.addAll(inputs.get(0).getValue().getSchema()); - } - - @Override - public VariablePropagationPolicy getVariablePropagationPolicy() { - return VariablePropagationPolicy.ALL; - } - - @Override - public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException { - boolean b = false; - for (int i = 0; i < expressions.size(); i++) { - if (visitor.transform(expressions.get(i))) { - b = true; - } - } - return b; - } - - @Override - public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException { - return visitor.visitPartitioningSplitOperator(this, arg); - } - - @Override - public boolean isMap() { - return false; - } - - @Override - public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException { - return createPropagatingAllInputsTypeEnvironment(ctx); - } - -} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java index 834107c..2d2fd0f 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java @@ -18,36 +18,18 @@ */ package org.apache.hyracks.algebricks.core.algebra.operators.logical; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; -import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; -import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; -import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy; -import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; -import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; -public class ReplicateOperator extends AbstractLogicalOperator { - - private int outputArity; - private boolean[] outputMaterializationFlags; - private List<Mutable<ILogicalOperator>> outputs; +public class ReplicateOperator extends AbstractReplicateOperator { public ReplicateOperator(int outputArity) { - this.outputArity = outputArity; - this.outputMaterializationFlags = new boolean[outputArity]; - this.outputs = new ArrayList<>(); + super(outputArity); } public ReplicateOperator(int outputArity, boolean[] outputMaterializationFlags) { - this.outputArity = outputArity; - this.outputMaterializationFlags = outputMaterializationFlags; - this.outputs = new ArrayList<>(); + super(outputArity, outputMaterializationFlags); } @Override @@ -58,52 +40,6 @@ @Override public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException { return visitor.visitReplicateOperator(this, arg); - } - - @Override - public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) - throws AlgebricksException { - return false; - } - - @Override - public VariablePropagationPolicy getVariablePropagationPolicy() { - return VariablePropagationPolicy.ALL; - } - - @Override - public boolean isMap() { - return true; - } - - @Override - public void recomputeSchema() { - schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema()); - } - - public void substituteVar(LogicalVariable v1, LogicalVariable v2) { - // do nothing - } - - public int getOutputArity() { - return outputArity; - } - - public void setOutputMaterializationFlags(boolean[] outputMaterializationFlags) { - this.outputMaterializationFlags = outputMaterializationFlags; - } - - public boolean[] getOutputMaterializationFlags() { - return outputMaterializationFlags; - } - - public List<Mutable<ILogicalOperator>> getOutputs() { - return outputs; - } - - @Override - public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException { - return createPropagatingAllInputsTypeEnvironment(ctx); } public boolean isBlocker() { diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java new file mode 100644 index 0000000..a996673 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java @@ -0,0 +1,65 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hyracks.algebricks.core.algebra.operators.logical; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; + +/** + * Split Operator receives an expression. This expression will be evaluated to an integer value during the runtime. + * Based on its value, it propagates each tuple to the corresponding output frame. (e.g., first output = 0, ...) + * Thus, unlike Replicate operator that does unconditional propagation to all outputs, + * this does a conditional propagate operation. + */ +public class SplitOperator extends AbstractReplicateOperator { + + // Expression that keeps the output branch information for each tuple + private final Mutable<ILogicalExpression> branchingExpression; + + public SplitOperator(int outputArity, Mutable<ILogicalExpression> branchingExpression) { + super(outputArity); + this.branchingExpression = branchingExpression; + } + + @Override + public LogicalOperatorTag getOperatorTag() { + return LogicalOperatorTag.SPLIT; + } + + @Override + public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException { + return visitor.visitSplitOperator(this, arg); + } + + public Mutable<ILogicalExpression> getBranchingExpression() { + return branchingExpression; + } + + @Override + public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException { + return visitor.transform(branchingExpression); + } + + @Override + public void substituteVar(LogicalVariable v1, LogicalVariable v2) { + getBranchingExpression().getValue().substituteVar(v1, v2); + } + +} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java index b999493..d278078 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java @@ -48,13 +48,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -175,12 +175,12 @@ } @Override - public Long visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException { + public Long visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { return op.getInputs().get(0).getValue().accept(this, arg); } @Override - public Long visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { + public Long visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException { return op.getInputs().get(0).getValue().accept(this, arg); } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java index 5e2a041..b259869 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java @@ -30,7 +30,6 @@ import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.algebricks.common.utils.ListSet; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass; @@ -67,13 +66,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -405,12 +404,6 @@ } @Override - public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, IOptimizationContext ctx) - throws AlgebricksException { - throw new NotImplementedException(); - } - - @Override public Void visitProjectOperator(ProjectOperator op, IOptimizationContext ctx) throws AlgebricksException { propagateFDsAndEquivClassesForUsedVars(op, ctx, op.getVariables()); return null; @@ -423,6 +416,12 @@ } @Override + public Void visitSplitOperator(SplitOperator op, IOptimizationContext ctx) throws AlgebricksException { + propagateFDsAndEquivClasses(op, ctx); + return null; + } + + @Override public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext ctx) throws AlgebricksException { propagateFDsAndEquivClasses(op, ctx); return null; diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java index c0e8b34..7f34e8b 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java @@ -55,13 +55,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -277,18 +277,6 @@ } @Override - public Boolean visitPartitioningSplitOperator(PartitioningSplitOperator op, ILogicalOperator arg) - throws AlgebricksException { - AbstractLogicalOperator aop = (AbstractLogicalOperator) arg; - if (aop.getOperatorTag() != LogicalOperatorTag.PARTITIONINGSPLIT) { - return Boolean.FALSE; - } - PartitioningSplitOperator partitionOpArg = (PartitioningSplitOperator) copyAndSubstituteVar(op, arg); - boolean isomorphic = compareExpressions(op.getExpressions(), partitionOpArg.getExpressions()); - return isomorphic; - } - - @Override public Boolean visitReplicateOperator(ReplicateOperator op, ILogicalOperator arg) throws AlgebricksException { AbstractLogicalOperator aop = (AbstractLogicalOperator) arg; if (aop.getOperatorTag() != LogicalOperatorTag.REPLICATE) { @@ -298,6 +286,17 @@ } @Override + public Boolean visitSplitOperator(SplitOperator op, ILogicalOperator arg) throws AlgebricksException { + AbstractLogicalOperator aop = (AbstractLogicalOperator) arg; + if (aop.getOperatorTag() != LogicalOperatorTag.SPLIT) { + return Boolean.FALSE; + } + SplitOperator sOpArg = (SplitOperator) copyAndSubstituteVar(op, arg); + boolean isomorphic = op.getBranchingExpression().getValue().equals(sOpArg.getBranchingExpression().getValue()); + return isomorphic; + } + + @Override public Boolean visitMaterializeOperator(MaterializeOperator op, ILogicalOperator arg) throws AlgebricksException { AbstractLogicalOperator aop = (AbstractLogicalOperator) arg; if (aop.getOperatorTag() != LogicalOperatorTag.MATERIALIZE) { diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java index 965008c..58b31f8 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java @@ -22,8 +22,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; +import java.util.Set; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; @@ -57,13 +57,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -165,14 +165,13 @@ } @Override - public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, ILogicalOperator arg) - throws AlgebricksException { + public Void visitReplicateOperator(ReplicateOperator op, ILogicalOperator arg) throws AlgebricksException { mapVariablesStandard(op, arg); return null; } @Override - public Void visitReplicateOperator(ReplicateOperator op, ILogicalOperator arg) throws AlgebricksException { + public Void visitSplitOperator(SplitOperator op, ILogicalOperator arg) throws AlgebricksException { mapVariablesStandard(op, arg); return null; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java index 4241d84..f4b3195 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java @@ -52,18 +52,18 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency; import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; @@ -384,14 +384,6 @@ } @Override - public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, ILogicalOperator arg) - throws AlgebricksException { - PartitioningSplitOperator opCopy = new PartitioningSplitOperator( - exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getExpressions()), op.getDefaultBranchIndex()); - return opCopy; - } - - @Override public ILogicalOperator visitProjectOperator(ProjectOperator op, ILogicalOperator arg) throws AlgebricksException { ProjectOperator opCopy = new ProjectOperator(deepCopyVariableList(op.getVariables())); deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy); @@ -410,6 +402,13 @@ } @Override + public ILogicalOperator visitSplitOperator(SplitOperator op, ILogicalOperator arg) throws AlgebricksException { + SplitOperator opCopy = new SplitOperator(op.getOutputArity(), op.getBranchingExpression()); + deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy); + return opCopy; + } + + @Override public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, ILogicalOperator arg) throws AlgebricksException { MaterializeOperator opCopy = new MaterializeOperator(); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java index 8d3644d..7e92869 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java @@ -47,13 +47,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -174,13 +174,6 @@ } @Override - public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, IOptimizationContext arg) - throws AlgebricksException { - // TODO Auto-generated method stub - return null; - } - - @Override public Void visitProjectOperator(ProjectOperator op, IOptimizationContext context) throws AlgebricksException { propagateCardinalityAndFrameNumber(op, context); return null; @@ -193,6 +186,11 @@ } @Override + public Void visitSplitOperator(SplitOperator op, IOptimizationContext arg) throws AlgebricksException { + return null; + } + + @Override public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext arg) throws AlgebricksException { // TODO Auto-generated method stub return null; diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java index fde6a28..442899f 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java @@ -52,13 +52,14 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -66,7 +67,6 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; @@ -167,16 +167,13 @@ } @Override - public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) - throws AlgebricksException { - ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>(); - deepCopyExpressionRefs(newExpressions, op.getExpressions()); - return new PartitioningSplitOperator(newExpressions, op.getDefaultBranchIndex()); + public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { + return new ReplicateOperator(op.getOutputArity()); } @Override - public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { - return new ReplicateOperator(op.getOutputArity()); + public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException { + return new SplitOperator(op.getOutputArity(), op.getBranchingExpression()); } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java index f01b20f..9f1acea 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java @@ -49,13 +49,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -174,13 +174,12 @@ } @Override - public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, IOptimizationContext ctx) - throws AlgebricksException { + public Void visitReplicateOperator(ReplicateOperator op, IOptimizationContext ctx) throws AlgebricksException { return null; } @Override - public Void visitReplicateOperator(ReplicateOperator op, IOptimizationContext ctx) throws AlgebricksException { + public Void visitSplitOperator(SplitOperator op, IOptimizationContext arg) throws AlgebricksException { return null; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java index 10659b1..3645aff 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java @@ -48,18 +48,18 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -156,11 +156,6 @@ } @Override - public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException { - return null; - } - - @Override public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException { return null; } @@ -255,6 +250,11 @@ } @Override + public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException { + return null; + } + + @Override public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException { return null; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java index d35153a..a746cf2 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java @@ -46,18 +46,18 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -179,12 +179,6 @@ } @Override - public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException { - standardLayout(op); - return null; - } - - @Override public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException { schemaVariables.addAll(op.getVariables()); return null; @@ -288,6 +282,12 @@ } @Override + public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException { + standardLayout(op); + return null; + } + + @Override public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException { standardLayout(op); return null; diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java index f2da7c4..7345928 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java @@ -46,19 +46,19 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -213,16 +213,6 @@ throws AlgebricksException { for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) { oe.second.getValue().substituteVar(pair.first, pair.second); - } - substVarTypes(op, pair); - return null; - } - - @Override - public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, - Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException { - for (Mutable<ILogicalExpression> e : op.getExpressions()) { - e.getValue().substituteVar(pair.first, pair.second); } substVarTypes(op, pair); return null; @@ -414,6 +404,13 @@ } @Override + public Void visitSplitOperator(SplitOperator op, Pair<LogicalVariable, LogicalVariable> arg) + throws AlgebricksException { + op.substituteVar(arg.first, arg.second); + return null; + } + + @Override public Void visitMaterializeOperator(MaterializeOperator op, Pair<LogicalVariable, LogicalVariable> arg) throws AlgebricksException { return null; diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java index 0d50dc2..cfe2a37 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java @@ -52,13 +52,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -227,14 +227,6 @@ public Void visitOrderOperator(OrderOperator op, Void arg) { for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) { oe.second.getValue().getUsedVariables(usedVariables); - } - return null; - } - - @Override - public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) { - for (Mutable<ILogicalExpression> e : op.getExpressions()) { - e.getValue().getUsedVariables(usedVariables); } return null; } @@ -443,6 +435,14 @@ } @Override + public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException { + for (Mutable<ILogicalOperator> outputOp : op.getOutputs()) { + VariableUtilities.getUsedVariables(outputOp.getValue(), usedVariables); + } + return null; + } + + @Override public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException { return null; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java new file mode 100644 index 0000000..1d13163 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; + +public abstract class AbstractReplicatePOperator extends AbstractPhysicalOperator { + + @Override + public boolean isMicroOperator() { + return false; + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { + return emptyUnaryRequirements(); + } + + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone(); + } + + @Override + public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) { + int[] inputDependencyLabels = new int[] { 0 }; + AbstractReplicateOperator rop = (AbstractReplicateOperator) op; + int[] outputDependencyLabels = new int[rop.getOutputArity()]; + // change the labels of outputs that requires materialization to 1 + boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags(); + for (int i = 0; i < rop.getOutputArity(); i++) { + if (outputMaterializationFlags[i]) { + outputDependencyLabels[i] = 1; + } + } + return new Pair<>(inputDependencyLabels, outputDependencyLabels); + } + + @Override + public boolean expensiveThanMaterialization() { + return false; + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java index 6501aeb..74739da 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java @@ -19,24 +19,19 @@ package org.apache.hyracks.algebricks.core.algebra.operators.physical; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; -import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; -import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; -import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor; +import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor; -public class ReplicatePOperator extends AbstractPhysicalOperator { +public class ReplicatePOperator extends AbstractReplicatePOperator { @Override public PhysicalOperatorTag getOperatorTag() { @@ -44,52 +39,22 @@ } @Override - public boolean isMicroOperator() { - return false; - } - - @Override - public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { - return emptyUnaryRequirements(); - } - - @Override - public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { - AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); - deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone(); - } - - @Override public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) throws AlgebricksException { IOperatorDescriptorRegistry spec = builder.getJobSpec(); - RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context); + RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), + propagatedSchema, context); ReplicateOperator rop = (ReplicateOperator) op; int outputArity = rop.getOutputArity(); boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags(); - SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity, outputMaterializationFlags); + ReplicateOperatorDescriptor splitOpDesc = new ReplicateOperatorDescriptor(spec, recDescriptor, outputArity, + outputMaterializationFlags); contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc); ILogicalOperator src = op.getInputs().get(0).getValue(); builder.contributeGraphEdge(src, 0, op, 0); - } - - @Override - public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) { - int[] inputDependencyLabels = new int[] { 0 }; - ReplicateOperator rop = (ReplicateOperator) op; - int[] outputDependencyLabels = new int[rop.getOutputArity()]; - // change the labels of outputs that requires materialization to 1 - boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags(); - for (int i = 0; i < rop.getOutputArity(); i++) { - if (outputMaterializationFlags[i]) { - outputDependencyLabels[i] = 1; - } - } - return new Pair<>(inputDependencyLabels, outputDependencyLabels); } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java new file mode 100644 index 0000000..3b8aaab --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.std.SplitOperatorDescriptor; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; + +public class SplitPOperator extends AbstractReplicatePOperator { + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.SPLIT; + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + SplitOperator sop = (SplitOperator) op; + int outputArity = sop.getOutputArity(); + + IOperatorDescriptorRegistry spec = builder.getJobSpec(); + RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), + propagatedSchema, context); + + IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider(); + IScalarEvaluatorFactory brachingExprEvalFactory = expressionRuntimeProvider.createEvaluatorFactory( + sop.getBranchingExpression().getValue(), context.getTypeEnvironment(op), inputSchemas, context); + + IBinaryIntegerInspectorFactory intInsepctorFactory = context.getBinaryIntegerInspectorFactory(); + + SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity, + brachingExprEvalFactory, intInsepctorFactory); + + contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc); + ILogicalOperator src = op.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src, 0, op, 0); + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java index 7e83880..d3dd166 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java @@ -41,6 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator; @@ -49,13 +50,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -63,7 +64,6 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; @@ -236,15 +236,6 @@ } @Override - public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Integer indent) - throws AlgebricksException { - addIndent(indent).append("partitioning-split ("); - pprintExprList(op.getExpressions(), indent); - buffer.append(")"); - return null; - } - - @Override public Void visitSubplanOperator(SubplanOperator op, Integer indent) throws AlgebricksException { addIndent(indent).append("subplan {"); printNestedPlans(op, indent); @@ -363,6 +354,13 @@ } @Override + public Void visitSplitOperator(SplitOperator op, Integer indent) throws AlgebricksException { + Mutable<ILogicalExpression> branchingExpression = op.getBranchingExpression(); + addIndent(indent).append("split " + branchingExpression.getValue().accept(exprVisitor, indent)); + return null; + } + + @Override public Void visitMaterializeOperator(MaterializeOperator op, Integer indent) throws AlgebricksException { addIndent(indent).append("materialize"); return null; diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java index c0f9718..f5ff8b4 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java @@ -34,18 +34,18 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -82,10 +82,10 @@ public R visitProjectOperator(ProjectOperator op, T arg) throws AlgebricksException; - public R visitPartitioningSplitOperator(PartitioningSplitOperator op, T arg) throws AlgebricksException; - public R visitReplicateOperator(ReplicateOperator op, T arg) throws AlgebricksException; + public R visitSplitOperator(SplitOperator op, T arg) throws AlgebricksException; + public R visitMaterializeOperator(MaterializeOperator op, T arg) throws AlgebricksException; public R visitScriptOperator(ScriptOperator op, T arg) throws AlgebricksException; diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java index 39cac06..c5d7291 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java @@ -48,12 +48,12 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -174,13 +174,12 @@ } @Override - public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) - throws AlgebricksException { + public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { return visit(op); } @Override - public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { + public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException { return visit(op); } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java deleted file mode 100644 index 2d5c929..0000000 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.runtime.operators.std; - -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.primitive.VoidPointable; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; -import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; -import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable; - -public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { - private static final long serialVersionUID = 1L; - public static int NO_DEFAULT_BRANCH = -1; - - private final IScalarEvaluatorFactory[] evalFactories; - private final IBinaryBooleanInspector boolInspector; - private final int defaultBranchIndex; - - public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec, - IScalarEvaluatorFactory[] evalFactories, IBinaryBooleanInspector boolInspector, int defaultBranchIndex, - RecordDescriptor rDesc) { - super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length + 1 : evalFactories.length); - for (int i = 0; i < evalFactories.length; i++) { - recordDescriptors[i] = rDesc; - } - this.evalFactories = evalFactories; - this.boolInspector = boolInspector; - this.defaultBranchIndex = defaultBranchIndex; - } - - @Override - public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, - final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) - throws HyracksDataException { - return new AbstractUnaryInputOperatorNodePushable() { - private final IFrameWriter[] writers = new IFrameWriter[outputArity]; - private final boolean[] isOpen = new boolean[outputArity]; - private final IFrame[] writeBuffers = new IFrame[outputArity]; - private final IScalarEvaluator[] evals = new IScalarEvaluator[outputArity]; - private final IPointable evalPointable = new VoidPointable(); - private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), - 0); - private final FrameTupleAccessor accessor = new FrameTupleAccessor(inOutRecDesc); - private final FrameTupleReference frameTuple = new FrameTupleReference(); - - private final FrameTupleAppender tupleAppender = new FrameTupleAppender(); - private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount()); - private final DataOutput tupleDos = tupleBuilder.getDataOutput(); - - @Override - public void close() throws HyracksDataException { - HyracksDataException hde = null; - for (int i = 0; i < outputArity; i++) { - if (isOpen[i]) { - try { - tupleAppender.reset(writeBuffers[i], false); - tupleAppender.write(writers[i], false); - } catch (Throwable th) { - if (hde == null) { - hde = new HyracksDataException(); - } - hde.addSuppressed(th); - } finally { - try { - writers[i].close(); - } catch (Throwable th) { - if (hde == null) { - hde = new HyracksDataException(); - } - hde.addSuppressed(th); - } - } - } - } - if (hde != null) { - throw hde; - } - } - - @Override - public void flush() throws HyracksDataException { - for (int i = 0; i < outputArity; i++) { - tupleAppender.reset(writeBuffers[i], false); - tupleAppender.flush(writers[i]); - } - } - - @Override - public void fail() throws HyracksDataException { - HyracksDataException hde = null; - for (int i = 0; i < outputArity; i++) { - if (isOpen[i]) { - try { - writers[i].fail(); - } catch (Throwable th) { - if (hde == null) { - hde = new HyracksDataException(); - } - hde.addSuppressed(th); - } - } - } - if (hde != null) { - throw hde; - } - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - accessor.reset(buffer); - int tupleCount = accessor.getTupleCount(); - for (int i = 0; i < tupleCount; i++) { - frameTuple.reset(accessor, i); - boolean found = false; - for (int j = 0; j < evals.length; j++) { - try { - evals[j].evaluate(frameTuple, evalPointable); - } catch (AlgebricksException e) { - throw new HyracksDataException(e); - } - found = boolInspector.getBooleanValue(evalPointable.getByteArray(), - evalPointable.getStartOffset(), evalPointable.getLength()); - if (found) { - copyAndAppendTuple(j); - break; - } - } - // Optionally write to default output branch. - if (!found && defaultBranchIndex != NO_DEFAULT_BRANCH) { - copyAndAppendTuple(defaultBranchIndex); - } - } - } - - private void copyAndAppendTuple(int outputIndex) throws HyracksDataException { - // Copy tuple into tuple builder. - try { - tupleBuilder.reset(); - for (int i = 0; i < frameTuple.getFieldCount(); i++) { - tupleDos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), - frameTuple.getFieldLength(i)); - tupleBuilder.addFieldEndOffset(); - } - } catch (IOException e) { - throw new HyracksDataException(e); - } - tupleAppender.reset(writeBuffers[outputIndex], false); - FrameUtils.appendToWriter(writers[outputIndex], tupleAppender, tupleBuilder.getFieldEndOffsets(), - tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()); - } - - @Override - public void open() throws HyracksDataException { - for (int i = 0; i < writers.length; i++) { - isOpen[i] = true; - writers[i].open(); - } - // Create write buffers. - for (int i = 0; i < outputArity; i++) { - writeBuffers[i] = new VSizeFrame(ctx); - // Make sure to clear all buffers, since we are reusing the tupleAppender. - tupleAppender.reset(writeBuffers[i], true); - } - // Create evaluators for partitioning. - try { - for (int i = 0; i < evalFactories.length; i++) { - evals[i] = evalFactories[i].createScalarEvaluator(ctx); - } - } catch (AlgebricksException e) { - throw new HyracksDataException(e); - } - } - - @Override - public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { - writers[index] = writer; - } - }; - } -} diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java new file mode 100644 index 0000000..2215a96 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.runtime.operators.std; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector; +import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.ActivityId; +import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; +import org.apache.hyracks.dataflow.std.base.AbstractReplicateOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable; + +/** + * Split operator propagates each tuple in a frame to one output branch only unlike Replicate operator. + */ +public class SplitOperatorDescriptor extends AbstractReplicateOperatorDescriptor { + private static final long serialVersionUID = 1L; + + private IScalarEvaluatorFactory brachingExprEvalFactory; + private IBinaryIntegerInspectorFactory intInsepctorFactory; + + public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity, + IScalarEvaluatorFactory brachingExprEvalFactory, IBinaryIntegerInspectorFactory intInsepctorFactory) { + super(spec, rDesc, outputArity); + this.brachingExprEvalFactory = brachingExprEvalFactory; + this.intInsepctorFactory = intInsepctorFactory; + } + + @Override + public void contributeActivities(IActivityGraphBuilder builder) { + SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode( + new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID)); + builder.addActivity(this, sma); + builder.addSourceEdge(0, sma, 0); + for (int i = 0; i < outputArity; i++) { + builder.addTargetEdge(i, sma, i); + } + } + + // The difference between SplitterMaterializerActivityNode and ReplicatorMaterializerActivityNode is that + // SplitterMaterializerActivityNode propagates each tuple to one output branch only. + private final class SplitterMaterializerActivityNode extends ReplicatorMaterializerActivityNode { + private static final long serialVersionUID = 1L; + + public SplitterMaterializerActivityNode(ActivityId id) { + super(id); + } + + @Override + public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) + throws HyracksDataException { + final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs]; + final boolean[] isOpen = new boolean[numberOfNonMaterializedOutputs]; + final IPointable p = VoidPointable.FACTORY.createPointable();; + // To deal with each tuple in a frame + final FrameTupleAccessor accessor = new FrameTupleAccessor(recordDescriptors[0]);; + final FrameTupleAppender[] appenders = new FrameTupleAppender[numberOfNonMaterializedOutputs]; + final FrameTupleReference tRef = new FrameTupleReference();; + final IBinaryIntegerInspector intInsepctor = intInsepctorFactory.createBinaryIntegerInspector(ctx); + final IScalarEvaluator eval; + try { + eval = brachingExprEvalFactory.createScalarEvaluator(ctx); + } catch (AlgebricksException ae) { + throw new HyracksDataException(ae); + } + for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { + appenders[i] = new FrameTupleAppender(new VSizeFrame(ctx), true); + } + + return new AbstractUnaryInputOperatorNodePushable() { + @Override + public void open() throws HyracksDataException { + for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { + isOpen[i] = true; + writers[i].open(); + } + } + + @Override + public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException { + // Tuple based access + accessor.reset(bufferAccessor); + int tupleCount = accessor.getTupleCount(); + // The output branch number that starts from 0. + int outputBranch; + + for (int i = 0; i < tupleCount; i++) { + // Get the output branch number from the field in the given tuple. + tRef.reset(accessor, i); + try { + eval.evaluate(tRef, p); + } catch (AlgebricksException ae) { + throw new HyracksDataException(ae); + } + outputBranch = intInsepctor.getIntegerValue(p.getByteArray(), p.getStartOffset(), + p.getLength()); + + // Add this tuple to the correct output frame. + FrameUtils.appendToWriter(writers[outputBranch], appenders[outputBranch], accessor, i); + } + } + + @Override + public void close() throws HyracksDataException { + HyracksDataException hde = null; + for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { + if (isOpen[i]) { + try { + appenders[i].write(writers[i], true); + writers[i].close(); + } catch (Throwable th) { + if (hde == null) { + hde = new HyracksDataException(th); + } else { + hde.addSuppressed(th); + } + } + } + } + if (hde != null) { + throw hde; + } + } + + @Override + public void fail() throws HyracksDataException { + HyracksDataException hde = null; + for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { + if (isOpen[i]) { + try { + writers[i].fail(); + } catch (Throwable th) { + if (hde == null) { + hde = new HyracksDataException(th); + } else { + hde.addSuppressed(th); + } + } + } + } + if (hde != null) { + throw hde; + } + } + + @Override + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + writers[index] = writer; + } + }; + } + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl new file mode 100644 index 0000000..0ea8a88 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl @@ -0,0 +1,4 @@ +0,first branch1 +0,first branch2 +0,first branch3 +0,first branch4 diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl new file mode 100644 index 0000000..53588ef --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl @@ -0,0 +1,3 @@ +1,second branch1 +1,second branch2 +1,second branch3 diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl new file mode 100644 index 0000000..ceb859a --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl @@ -0,0 +1,7 @@ +0|first branch1 +1|second branch1 +0|first branch2 +1|second branch2 +0|first branch3 +1|second branch3 +0|first branch4 \ No newline at end of file diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java index 020cffe..1276518 100644 --- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java +++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java @@ -50,6 +50,7 @@ import org.apache.hyracks.algebricks.runtime.operators.std.PrinterRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.RunningAggregateRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.std.SplitOperatorDescriptor; import org.apache.hyracks.algebricks.runtime.operators.std.StreamLimitRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.StreamProjectRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory; @@ -83,7 +84,7 @@ import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; import org.apache.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor; import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor; -import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor; +import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor; import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -570,7 +571,7 @@ } @Test - public void scanSplitWrite() throws Exception { + public void scanReplicateWrite() throws Exception { final int outputArity = 2; JobSpecification spec = new JobSpecification(FRAME_SIZE); @@ -596,7 +597,69 @@ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID }); - SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity); + ReplicateOperatorDescriptor replicateOp = new ReplicateOperatorDescriptor(spec, stringRec, outputArity); + + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, replicateOp, + new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID }); + + IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length]; + for (int i = 0; i < outputArity; i++) { + outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] { + new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(outputFile[i])) }); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], + new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID }); + } + + spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, replicateOp, 0); + for (int i = 0; i < outputArity; i++) { + spec.connect(new OneToOneConnectorDescriptor(spec), replicateOp, i, outputOp[i], 0); + } + + for (int i = 0; i < outputArity; i++) { + spec.addRoot(outputOp[i]); + } + AlgebricksHyracksIntegrationUtil.runJob(spec); + + for (int i = 0; i < outputArity; i++) { + compareFiles(inputFileName, outputFile[i].getAbsolutePath()); + } + } + + @Test + public void scanSplitWrite() throws Exception { + final int outputArity = 2; + + JobSpecification spec = new JobSpecification(FRAME_SIZE); + + String inputFileName[] = { "data/simple/int-string-part1.tbl", "data/simple/int-string-part1-split-0.tbl", + "data/simple/int-string-part1-split-1.tbl" }; + File[] inputFiles = new File[inputFileName.length]; + for (int i=0; i<inputFileName.length; i++) { + inputFiles[i] = new File(inputFileName[i]); + } + File[] outputFile = new File[outputArity]; + for (int i = 0; i < outputArity; i++) { + outputFile[i] = File.createTempFile("splitop", null); + } + + FileSplit[] inputSplits = new FileSplit[] { + new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(inputFiles[0])) }; + IFileSplitProvider intSplitProvider = new ConstantFileSplitProvider(inputSplits); + + RecordDescriptor scannerDesc = new RecordDescriptor( + new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE, + new UTF8StringSerializerDeserializer() }); + + IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE }; + + FileScanOperatorDescriptor intScanner = new FileScanOperatorDescriptor(spec, intSplitProvider, + new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc); + + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, intScanner, DEFAULT_NODES); + + SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, scannerDesc, outputArity, + new TupleFieldEvaluatorFactory(0), BinaryIntegerInspectorImpl.FACTORY); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp, new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID }); @@ -609,7 +672,7 @@ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID }); } - spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), intScanner, 0, splitOp, 0); for (int i = 0; i < outputArity; i++) { spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0); } @@ -620,7 +683,7 @@ AlgebricksHyracksIntegrationUtil.runJob(spec); for (int i = 0; i < outputArity; i++) { - compareFiles(inputFileName, outputFile[i].getAbsolutePath()); + compareFiles(inputFileName[i + 1], outputFile[i].getAbsolutePath()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java similarity index 79% rename from hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java rename to hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java index 67af861..5c642ba 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.dataflow.std.misc; +package org.apache.hyracks.dataflow.std.base; import java.nio.ByteBuffer; @@ -32,28 +32,32 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; -import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; -import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; +import org.apache.hyracks.dataflow.std.misc.MaterializerTaskState; -public class SplitOperatorDescriptor extends AbstractOperatorDescriptor { - private static final long serialVersionUID = 1L; +/** + * Abstract class for two replication related operator descriptor - replicate and split + * Replicate operator propagates all frames to all output branches. + * That is, each tuple will be propagated to all output branches. + * Split operator propagates each tuple in a frame to one output branch only. + */ +public abstract class AbstractReplicateOperatorDescriptor extends AbstractOperatorDescriptor { + protected static final long serialVersionUID = 1L; - private final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0; - private final static int MATERIALIZE_READER_ACTIVITY_ID = 1; + protected final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0; + protected final static int MATERIALIZE_READER_ACTIVITY_ID = 1; - private final boolean[] outputMaterializationFlags; - private final boolean requiresMaterialization; - private final int numberOfNonMaterializedOutputs; - private final int numberOfMaterializedOutputs; + protected final boolean[] outputMaterializationFlags; + protected final boolean requiresMaterialization; + protected final int numberOfNonMaterializedOutputs; + protected final int numberOfMaterializedOutputs; - public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity) { + public AbstractReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, + int outputArity) { this(spec, rDesc, outputArity, new boolean[outputArity]); } - public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity, - boolean[] outputMaterializationFlags) { + public AbstractReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, + int outputArity, boolean[] outputMaterializationFlags) { super(spec, 1, outputArity); for (int i = 0; i < outputArity; i++) { recordDescriptors[i] = rDesc; @@ -80,16 +84,16 @@ @Override public void contributeActivities(IActivityGraphBuilder builder) { - SplitterMaterializerActivityNode sma = - new SplitterMaterializerActivityNode(new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID)); + ReplicatorMaterializerActivityNode sma = new ReplicatorMaterializerActivityNode( + new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID)); builder.addActivity(this, sma); builder.addSourceEdge(0, sma, 0); int pipelineOutputIndex = 0; int activityId = MATERIALIZE_READER_ACTIVITY_ID; for (int i = 0; i < outputArity; i++) { if (outputMaterializationFlags[i]) { - MaterializeReaderActivityNode mra = - new MaterializeReaderActivityNode(new ActivityId(odId, activityId++)); + MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode( + new ActivityId(odId, activityId++)); builder.addActivity(this, mra); builder.addBlockingEdge(sma, mra); builder.addTargetEdge(i, mra, 0); @@ -99,16 +103,17 @@ } } - private final class SplitterMaterializerActivityNode extends AbstractActivityNode { + protected class ReplicatorMaterializerActivityNode extends AbstractActivityNode { private static final long serialVersionUID = 1L; - public SplitterMaterializerActivityNode(ActivityId id) { + public ReplicatorMaterializerActivityNode(ActivityId id) { super(id); } @Override public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) { + IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) + throws HyracksDataException { return new AbstractUnaryInputOperatorNodePushable() { private MaterializerTaskState state; private final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs]; @@ -140,10 +145,8 @@ @Override public void flush() throws HyracksDataException { - if (!requiresMaterialization) { - for (IFrameWriter writer : writers) { - writer.flush(); - } + for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { + writers[i].flush(); } } @@ -204,7 +207,7 @@ } } - private final class MaterializeReaderActivityNode extends AbstractActivityNode { + protected class MaterializeReaderActivityNode extends AbstractActivityNode { private static final long serialVersionUID = 1L; public MaterializeReaderActivityNode(ActivityId id) { @@ -227,5 +230,4 @@ }; } } - } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java new file mode 100644 index 0000000..0782647 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.misc; + +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.base.AbstractReplicateOperatorDescriptor; + +public class ReplicateOperatorDescriptor extends AbstractReplicateOperatorDescriptor { + private static final long serialVersionUID = 1L; + + public ReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity) { + this(spec, rDesc, outputArity, new boolean[outputArity]); + } + + public ReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity, + boolean[] outputMaterializationFlags) { + super(spec, rDesc, outputArity, outputMaterializationFlags); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java similarity index 92% rename from hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java rename to hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java index 40b4251..9a56cd3 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java @@ -23,9 +23,6 @@ import java.io.FileReader; import java.io.IOException; -import org.junit.Assert; -import org.junit.Test; - import org.apache.hyracks.api.constraints.PartitionConstraintHelper; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; @@ -40,11 +37,13 @@ import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory; import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor; import org.apache.hyracks.dataflow.std.file.FileSplit; -import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor; +import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor; import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor; import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider; +import org.junit.Assert; +import org.junit.Test; -public class SplitOperatorTest extends AbstractIntegrationTest { +public class ReplicateOperatorTest extends AbstractIntegrationTest { public void compareFiles(String fileNameA, String fileNameB) throws IOException { BufferedReader fileA = new BufferedReader(new FileReader(fileNameA)); @@ -69,7 +68,7 @@ String inputFileName = "data/words.txt"; File[] outputFile = new File[outputArity]; for (int i = 0; i < outputArity; i++) { - outputFile[i] = File.createTempFile("splitop", null); + outputFile[i] = File.createTempFile("replicateop", null); outputFile[i].deleteOnExit(); } @@ -86,8 +85,8 @@ inputSplits), stringParser, stringRec); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, locations); - SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp, locations); + ReplicateOperatorDescriptor replicateOp = new ReplicateOperatorDescriptor(spec, stringRec, outputArity); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, replicateOp, locations); IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length]; for (int i = 0; i < outputArity; i++) { @@ -99,9 +98,9 @@ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations); } - spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, replicateOp, 0); for (int i = 0; i < outputArity; i++) { - spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0); + spec.connect(new OneToOneConnectorDescriptor(spec), replicateOp, i, outputOp[i], 0); } for (int i = 0; i < outputArity; i++) { -- To view, visit https://asterix-gerrit.ics.uci.edu/1196 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ice190827513cd8632764b52c9d0338d65c830740 Gerrit-PatchSet: 19 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Taewoo Kim <wangs...@yahoo.com> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Jianfeng Jia <jianfeng....@gmail.com> Gerrit-Reviewer: Taewoo Kim <wangs...@yahoo.com> Gerrit-Reviewer: Till Westmann <ti...@apache.org> Gerrit-Reviewer: Yingyi Bu <buyin...@gmail.com>