Yingyi Bu has submitted this change and it was merged. Change subject: ASTERIXDB-1343: support heterogeneity of computation nodes and storage nodes. ......................................................................
ASTERIXDB-1343: support heterogeneity of computation nodes and storage nodes. Change-Id: Ic21d8da2cd457aa17cc9861c0b92ac5960978e03 Reviewed-on: https://asterix-gerrit.ics.uci.edu/718 Tested-by: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java M algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java M algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java 56 files changed, 215 insertions(+), 158 deletions(-) Approvals: Till Westmann: Looks good to me, approved Jenkins: Verified diff --git a/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java index 0d899b9..1d3a55c 100644 --- a/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java +++ b/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.algebricks.compiler.api; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; @@ -51,11 +52,11 @@ IExpressionEvalSizeComputer expressionEvalSizeComputer, IMergeAggregationExpressionFactory mergeAggregationExpressionFactory, IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer, - PhysicalOptimizationConfig physicalOptimizationConfig) { + PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations) { LogicalOperatorPrettyPrintVisitor prettyPrintVisitor = new LogicalOperatorPrettyPrintVisitor(); return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer, nullableTypeComputer, - physicalOptimizationConfig, prettyPrintVisitor); + physicalOptimizationConfig, clusterLocations, prettyPrintVisitor); } } @@ -77,7 +78,7 @@ int varCounter) { final IOptimizationContext oc = optCtxFactory.createOptimizationContext(varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer, - nullableTypeComputer, physicalOptimizationConfig); + nullableTypeComputer, physicalOptimizationConfig, clusterLocations); oc.setMetadataDeclarations(metadata); final HeuristicOptimizer opt = new HeuristicOptimizer(plan, logicalRewrites, physicalRewrites, oc); return new ICompiler() { @@ -92,13 +93,13 @@ IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException { AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting Job Generation.\n"); JobGenContext context = new JobGenContext(null, metadata, appContext, - serializerDeserializerProvider, hashFunctionFactoryProvider, - hashFunctionFamilyProvider, comparatorFactoryProvider, typeTraitProvider, - binaryBooleanInspectorFactory, binaryIntegerInspectorFactory, printerProvider, - nullWriterFactory, normalizedKeyComputerFactoryProvider, expressionRuntimeProvider, - expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer, - partialAggregationTypeComputer, predEvaluatorFactoryProvider, - physicalOptimizationConfig.getFrameSize(), clusterLocations); + serializerDeserializerProvider, hashFunctionFactoryProvider, hashFunctionFamilyProvider, + comparatorFactoryProvider, typeTraitProvider, binaryBooleanInspectorFactory, + binaryIntegerInspectorFactory, printerProvider, nullWriterFactory, + normalizedKeyComputerFactoryProvider, expressionRuntimeProvider, expressionTypeComputer, + nullableTypeComputer, oc, expressionEvalSizeComputer, partialAggregationTypeComputer, + predEvaluatorFactoryProvider, physicalOptimizationConfig.getFrameSize(), + clusterLocations); PlanCompiler pc = new PlanCompiler(context); return pc.compilePlan(plan, null, jobEventListenerFactory); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java index 9d68fae..707a7db 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java @@ -22,7 +22,6 @@ import java.util.Map; import org.apache.commons.lang3.mutable.Mutable; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode; @@ -66,7 +65,7 @@ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) - throws AlgebricksException; + throws AlgebricksException; // variables @@ -88,7 +87,8 @@ * @return for each child, one vector of required physical properties */ - public PhysicalRequirements getRequiredPhysicalPropertiesForChildren(IPhysicalPropertiesVector requiredProperties); + public PhysicalRequirements getRequiredPhysicalPropertiesForChildren(IPhysicalPropertiesVector requiredProperties, + IOptimizationContext context); /** * @return the physical properties that this operator delivers, based on diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java index 6badac7..328697d 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java @@ -28,6 +28,7 @@ import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor; import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency; import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; @@ -35,52 +36,54 @@ public interface IOptimizationContext extends ITypingContext, IVariableContext { @Override - public abstract IMetadataProvider<?, ?> getMetadataProvider(); + public IMetadataProvider<?, ?> getMetadataProvider(); - public abstract void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider); + public void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider); - public abstract boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op); + public boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op); - public abstract void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op); + public void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op); /* * returns true if op1 and op2 have already been compared */ - public abstract boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2); + public boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2); - public abstract void removeFromAlreadyCompared(ILogicalOperator op1); + public void removeFromAlreadyCompared(ILogicalOperator op1); - public abstract void addNotToBeInlinedVar(LogicalVariable var); + public void addNotToBeInlinedVar(LogicalVariable var); - public abstract boolean shouldNotBeInlined(LogicalVariable var); + public boolean shouldNotBeInlined(LogicalVariable var); - public abstract void addPrimaryKey(FunctionalDependency pk); + public void addPrimaryKey(FunctionalDependency pk); - public abstract List<LogicalVariable> findPrimaryKey(LogicalVariable var); + public List<LogicalVariable> findPrimaryKey(LogicalVariable var); - public abstract void putEquivalenceClassMap(ILogicalOperator op, Map<LogicalVariable, EquivalenceClass> eqClassMap); + public void putEquivalenceClassMap(ILogicalOperator op, Map<LogicalVariable, EquivalenceClass> eqClassMap); - public abstract Map<LogicalVariable, EquivalenceClass> getEquivalenceClassMap(ILogicalOperator op); + public Map<LogicalVariable, EquivalenceClass> getEquivalenceClassMap(ILogicalOperator op); - public abstract void putFDList(ILogicalOperator op, List<FunctionalDependency> fdList); + public void putFDList(ILogicalOperator op, List<FunctionalDependency> fdList); - public abstract List<FunctionalDependency> getFDList(ILogicalOperator op); + public List<FunctionalDependency> getFDList(ILogicalOperator op); public void clearAllFDAndEquivalenceClasses(); - public abstract void putLogicalPropertiesVector(ILogicalOperator op, ILogicalPropertiesVector v); + public void putLogicalPropertiesVector(ILogicalOperator op, ILogicalPropertiesVector v); - public abstract ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op); + public ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op); - public abstract IExpressionEvalSizeComputer getExpressionEvalSizeComputer(); + public IExpressionEvalSizeComputer getExpressionEvalSizeComputer(); - public abstract IVariableEvalSizeEnvironment getVariableEvalSizeEnvironment(); + public IVariableEvalSizeEnvironment getVariableEvalSizeEnvironment(); - public abstract IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory(); + public IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory(); - public abstract PhysicalOptimizationConfig getPhysicalOptimizationConfig(); + public PhysicalOptimizationConfig getPhysicalOptimizationConfig(); - public abstract void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars); + public void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars); - public abstract LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor(); + public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor(); + + public INodeDomain getComputationNodeDomain(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java index a3ef5d9..8c0ab2f 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java @@ -35,10 +35,12 @@ * @param reqdByParent * parent's requirements, which are not enforced for now, as we * only explore one plan + * @param context + * the optimization context * @return for each child, one vector of required physical properties */ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent); + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context); /** * @return the physical properties that this operator delivers, based on @@ -51,7 +53,7 @@ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) - throws AlgebricksException; + throws AlgebricksException; public void disableJobGenBelowMe(); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java index 5e9eef6..2410ca0 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java @@ -102,8 +102,8 @@ @Override public final PhysicalRequirements getRequiredPhysicalPropertiesForChildren( - IPhysicalPropertiesVector requiredProperties) { - return physicalOperator.getRequiredPropertiesForChildren(this, requiredProperties); + IPhysicalPropertiesVector requiredProperties, IOptimizationContext context) { + return physicalOperator.getRequiredPropertiesForChildren(this, requiredProperties, context); } /** diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java index 09d2253..7077014 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java @@ -86,7 +86,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2]; // In a cost-based optimizer, we would also try to propagate the // parent's partitioning requirements. @@ -97,12 +97,14 @@ if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) { switch (partitioningType) { case PAIRWISE: { - pp1 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysLeftBranch), null); - pp2 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysRightBranch), null); + pp1 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysLeftBranch), + context.getComputationNodeDomain()); + pp2 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysRightBranch), + context.getComputationNodeDomain()); break; } case BROADCAST: { - pp2 = new BroadcastPartitioningProperty(null); + pp2 = new BroadcastPartitioningProperty(context.getComputationNodeDomain()); break; } default: { diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java index 2c93cd4..2d00e4c 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java @@ -138,7 +138,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1]; List<ILocalStructuralProperty> localProps = null; @@ -233,7 +233,8 @@ IPartitioningProperty pp = null; AbstractLogicalOperator aop = (AbstractLogicalOperator) op; if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) { - pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), null); + pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), + context.getComputationNodeDomain()); } pv[0] = new StructuralPropertiesVector(pp, localProps); return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java index 2ab24d2..5159ac5 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java @@ -19,6 +19,7 @@ package org.apache.hyracks.algebricks.core.algebra.operators.physical; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; @@ -26,7 +27,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java index 4b13645..3759956 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java @@ -69,7 +69,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { AbstractLogicalOperator op = (AbstractLogicalOperator) iop; if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) { if (orderProp == null) { diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java index 2a2eb1e..3242fa0 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java @@ -72,7 +72,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { AggregateOperator aggOp = (AggregateOperator) op; StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1]; if (aggOp.isGlobal() && aggOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) { diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java index 5b6c40a..5aed63e 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java @@ -63,7 +63,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java index 03a8666..69bfe2a 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java @@ -61,7 +61,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java index 8ac3271..dda5456 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java @@ -72,7 +72,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { List<LogicalVariable> scanVariables = new ArrayList<>(); scanVariables.addAll(primaryKeys); scanVariables.add(new LogicalVariable(-1)); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java index 5823bc0..b3e8385 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java @@ -68,7 +68,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { DistributeResultOperator write = (DistributeResultOperator) op; IDataSink sink = write.getDataSink(); IPartitioningProperty pp = sink.getPartitioningProperty(); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java index 4e3a5bc..dcb7e15 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java @@ -56,7 +56,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return null; } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java index b15ea0b..7772620 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java @@ -135,12 +135,12 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { AbstractLogicalOperator aop = (AbstractLogicalOperator) op; if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) { StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1]; - pv[0] = new StructuralPropertiesVector( - new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnSet), null), null); + pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty( + new ListSet<LogicalVariable>(columnSet), context.getComputationNodeDomain()), null); return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION); } else { return emptyUnaryRequirements(); @@ -206,13 +206,16 @@ } List<LogicalVariable> keyAndDecVariables = new ArrayList<LogicalVariable>(); - for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getGroupByList()) + for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getGroupByList()) { keyAndDecVariables.add(p.first); - for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) + } + for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) { keyAndDecVariables.add(GroupByOperator.getDecorVariable(p)); + } - for (LogicalVariable var : keyAndDecVariables) + for (LogicalVariable var : keyAndDecVariables) { aggOpInputEnv.setVarType(var, outputEnv.getVarType(var)); + } compileSubplans(inputSchemas[0], gby, opSchema, context); IOperatorDescriptorRegistry spec = builder.getJobSpec(); @@ -236,10 +239,12 @@ for (Object type : intermediateTypes) { aggOpInputEnv.setVarType(usedVars.get(i++), type); } - for (LogicalVariable keyVar : keyAndDecVariables) + for (LogicalVariable keyVar : keyAndDecVariables) { localInputSchemas[0].addVariable(keyVar); - for (LogicalVariable usedVar : usedVars) + } + for (LogicalVariable usedVar : usedVars) { localInputSchemas[0].addVariable(usedVar); + } for (i = 0; i < n; i++) { AggregateFunctionCallExpression mergeFun = (AggregateFunctionCallExpression) aggOp.getMergeExpressions() .get(i).getValue(); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java index 0ff1e47..34c707b 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java @@ -76,7 +76,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java index 21be272..17322b6 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java @@ -100,7 +100,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>(); List<OrderColumn> columns = new ArrayList<OrderColumn>(); for (OrderColumn oc : orderColumns) { diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java index b837bfa..50ab6aa 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java @@ -83,7 +83,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { List<LogicalVariable> scanVariables = new ArrayList<>(); scanVariables.addAll(primaryKeys); scanVariables.add(new LogicalVariable(-1)); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java index 4702361..f29fd6f 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java @@ -87,7 +87,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>(); scanVariables.addAll(primaryKeys); scanVariables.add(new LogicalVariable(-1)); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java index 2e4b647..f0bd603 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java @@ -80,7 +80,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>(); scanVariables.addAll(keys); scanVariables.add(new LogicalVariable(-1)); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java index b6d0f1f..5f43c3e 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java @@ -21,7 +21,9 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; @@ -40,8 +42,8 @@ import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty; import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; -import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty; import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider; @@ -61,7 +63,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { IntersectOperator intersectOp = (IntersectOperator) iop; StructuralPropertiesVector[] pv = new StructuralPropertiesVector[intersectOp.getNumInput()]; for (int i = 0; i < intersectOp.getNumInput(); i++) { @@ -73,7 +75,8 @@ localProps.add(new LocalOrderProperty(orderColumns)); IPartitioningProperty pp = null; if (intersectOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) { - pp = new RandomPartitioningProperty(null); + Set<LogicalVariable> partitioningVariables = new HashSet<>(intersectOp.getInputVariables(i)); + pp = new UnorderedPartitionedProperty(partitioningVariables, null); } pv[i] = new StructuralPropertiesVector(pp, localProps); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java index a154d91..c55a4ae 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java @@ -55,7 +55,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java index 7ff15d7..2622ae3 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java @@ -113,13 +113,14 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { if (partitioningType != JoinPartitioningType.BROADCAST) { throw new NotImplementedException(partitioningType + " nested loop joins are not implemented."); } StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2]; - pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null); + pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(context.getComputationNodeDomain()), + null); pv[1] = new StructuralPropertiesVector(null, null); return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION); } @@ -225,10 +226,11 @@ } boolean result = binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength()); - if (result) + if (result) { return 0; - else + } else { return 1; + } } } @@ -256,28 +258,31 @@ @Override public byte[] getFieldData(int fIdx) { int leftFieldCount = refLeft.getFieldCount(); - if (fIdx < leftFieldCount) + if (fIdx < leftFieldCount) { return refLeft.getFieldData(fIdx); - else + } else { return refRight.getFieldData(fIdx - leftFieldCount); + } } @Override public int getFieldStart(int fIdx) { int leftFieldCount = refLeft.getFieldCount(); - if (fIdx < leftFieldCount) + if (fIdx < leftFieldCount) { return refLeft.getFieldStart(fIdx); - else + } else { return refRight.getFieldStart(fIdx - leftFieldCount); + } } @Override public int getFieldLength(int fIdx) { int leftFieldCount = refLeft.getFieldCount(); - if (fIdx < leftFieldCount) + if (fIdx < leftFieldCount) { return refLeft.getFieldLength(fIdx); - else + } else { return refRight.getFieldLength(fIdx - leftFieldCount); + } } @Override diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java index 7535c82..e8cc05f 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java @@ -89,7 +89,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return null; } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java index 27a5357..818e1ec 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java @@ -51,7 +51,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java index df65906..fe11f64 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java @@ -78,7 +78,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1]; List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>(); List<OrderColumn> orderColumns = new ArrayList<OrderColumn>(); @@ -89,7 +89,8 @@ IPartitioningProperty pp = null; AbstractLogicalOperator aop = (AbstractLogicalOperator) op; if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) { - pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), null); + pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), + context.getComputationNodeDomain()); } pv[0] = new StructuralPropertiesVector(pp, localProps); return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION); @@ -98,7 +99,7 @@ @Override public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) - throws AlgebricksException { + throws AlgebricksException { IOperatorDescriptorRegistry spec = builder.getJobSpec(); int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]); @@ -119,11 +120,11 @@ keysAndDecs[i + keys.length] = fdColumns[i]; } - IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories( - columnList, context.getTypeEnvironment(op), context); + IBinaryComparatorFactory[] comparatorFactories = JobGenHelper + .variablesToAscBinaryComparatorFactories(columnList, context.getTypeEnvironment(op), context); IAggregateEvaluatorFactory[] aggFactories = new IAggregateEvaluatorFactory[] {}; - IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory( - aggFactories, keysAndDecs); + IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(aggFactories, + keysAndDecs); RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java index bfe8b36..e11a64f 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java @@ -51,7 +51,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java index af0087d..7237d24 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java @@ -85,7 +85,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java index d47c31f..1f70f2a 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java @@ -105,7 +105,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>(); List<OrderColumn> columns = new ArrayList<OrderColumn>(); for (OrderColumn oc : partitioningFields) { diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java index dca3d97..9046ce2 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java @@ -85,7 +85,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java index 19fb5d4..14a8f16 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java @@ -50,7 +50,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java index 2c842e3..8e4ca18 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java @@ -63,7 +63,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java index 6246ad2..71acecf 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java @@ -75,7 +75,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(op.getInputs().size()); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java index 0b100ee..35f9444 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java @@ -67,7 +67,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { WriteOperator write = (WriteOperator) op; IDataSink sink = write.getDataSink(); IPartitioningProperty pp = sink.getPartitioningProperty(); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java index 7465711..c08ff85 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java @@ -138,7 +138,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java index f264284..81f6e6b 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java @@ -121,7 +121,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>(sortColumns.length); localProps.add(new LocalOrderProperty(Arrays.asList(sortColumns))); StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null, diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java index f47e671..99be356 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java @@ -71,7 +71,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op; if (limitOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) { StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1]; diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java index 326172f..184cbbc 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java @@ -49,7 +49,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java index 2c40b3b..9b35922 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java @@ -54,7 +54,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java index bd46230..1f5159d 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java @@ -49,7 +49,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java index aa5672e..9fc0dd4 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java @@ -77,7 +77,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java index 98427fe..557a657 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java @@ -66,7 +66,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java index 2c407b7..e6517d0 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java @@ -64,7 +64,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { StructuralPropertiesVector pv0 = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR; StructuralPropertiesVector pv1 = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR; return new PhysicalRequirements(new StructuralPropertiesVector[] { pv0, pv1 }, diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java index 9c8ddc3..7ec1914 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java @@ -80,7 +80,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent) { + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>(); scanVariables.addAll(keys); scanVariables.add(new LogicalVariable(-1)); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java index 908dcc1..719c70e 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java @@ -18,26 +18,53 @@ */ package org.apache.hyracks.algebricks.core.algebra.properties; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType; + public class DefaultNodeGroupDomain implements INodeDomain { - private String groupName; + private List<String> nodes = new ArrayList<>(); - public DefaultNodeGroupDomain(String groupName) { - this.groupName = groupName; + public DefaultNodeGroupDomain(List<String> nodes) { + this.nodes.addAll(nodes); + } + + public DefaultNodeGroupDomain(DefaultNodeGroupDomain domain) { + this.nodes.addAll(domain.nodes); + } + + public DefaultNodeGroupDomain(AlgebricksPartitionConstraint clusterLocations) { + if (clusterLocations.getPartitionConstraintType() == PartitionConstraintType.ABSOLUTE) { + AlgebricksAbsolutePartitionConstraint absPc = (AlgebricksAbsolutePartitionConstraint) clusterLocations; + String[] locations = absPc.getLocations(); + for (String location : locations) { + nodes.add(location); + } + } else { + throw new IllegalStateException("A node domain can only take absolute location constraints."); + } } @Override public boolean sameAs(INodeDomain domain) { - return true; + if (!(domain instanceof DefaultNodeGroupDomain)) { + return false; + } + DefaultNodeGroupDomain nodeDomain = (DefaultNodeGroupDomain) domain; + return nodes.equals(nodeDomain.nodes); } @Override public String toString() { - return "AsterixDomain(" + groupName + ")"; + return nodes.toString(); } @Override public Integer cardinality() { - return null; + return nodes.size(); } } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java index d872491..e298691 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java @@ -31,7 +31,6 @@ public List<ILocalStructuralProperty> getLocalProperties(); /** - * * @param reqd * vector of required properties * @param equivalenceClasses diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java index 5808da1..44df740 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java @@ -83,8 +83,8 @@ @Override public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) { - for (OrderColumn orderColumn : orderColumns){ - if (varMap.containsKey(orderColumn.getColumn())){ + for (OrderColumn orderColumn : orderColumns) { + if (varMap.containsKey(orderColumn.getColumn())) { orderColumn.setColumn(varMap.get(orderColumn.getColumn())); } } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java index 711c0e7..0b8d759 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java @@ -55,9 +55,8 @@ return k; } - public static boolean matchLocalProperties(List<ILocalStructuralProperty> reqd, - List<ILocalStructuralProperty> dlvd, Map<LogicalVariable, EquivalenceClass> equivalenceClasses, - List<FunctionalDependency> fds) { + public static boolean matchLocalProperties(List<ILocalStructuralProperty> reqd, List<ILocalStructuralProperty> dlvd, + Map<LogicalVariable, EquivalenceClass> equivalenceClasses, List<FunctionalDependency> fds) { if (reqd == null) { return true; } @@ -118,7 +117,7 @@ boolean mayExpandProperties) { INodeDomain dom1 = reqd.getNodeDomain(); INodeDomain dom2 = dlvd.getNodeDomain(); - if (dom1 != null && dom2 != null && !dom1.sameAs(dom2)) { + if (!dom1.sameAs(dom2)) { return false; } @@ -132,10 +131,11 @@ case UNORDERED_PARTITIONED: { UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd; UnorderedPartitionedProperty ud = (UnorderedPartitionedProperty) dlvd; - if (mayExpandProperties) + if (mayExpandProperties) { return (!ud.getColumnSet().isEmpty() && ur.getColumnSet().containsAll(ud.getColumnSet())); - else + } else { return (ud.getColumnSet().equals(ur.getColumnSet())); + } } case ORDERED_PARTITIONED: { UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd; diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java index 7ae7e62..2cad1cb 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java @@ -33,7 +33,8 @@ public static final StructuralPropertiesVector EMPTY_PROPERTIES_VECTOR = new StructuralPropertiesVector(null, new ArrayList<ILocalStructuralProperty>()); - public StructuralPropertiesVector(IPartitioningProperty propPartitioning, List<ILocalStructuralProperty> propsLocal) { + public StructuralPropertiesVector(IPartitioningProperty propPartitioning, + List<ILocalStructuralProperty> propsLocal) { this.propPartitioning = propPartitioning; this.propsLocal = propsLocal; } @@ -63,7 +64,6 @@ } /** - * * @param reqd * vector of required properties * @return a vector of properties from pvector that are not delivered by the diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java index 17e0cb3..d6a364a 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java @@ -67,8 +67,8 @@ @Override public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) { - for (Map.Entry<LogicalVariable, LogicalVariable> var : varMap.entrySet()){ - if (columnSet.remove(var.getKey())){ + for (Map.Entry<LogicalVariable, LogicalVariable> var : varMap.entrySet()) { + if (columnSet.remove(var.getKey())) { columnSet.add(var.getValue()); } } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java index ac2ae5c..4671a71 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java @@ -228,19 +228,13 @@ } } if (partitionConstraintMap.get(opDesc) == null) { - if (opConstraint == null) { - if (parentOp != null) { - AlgebricksPartitionConstraint pc = partitionConstraintMap.get(parentOp); - if (pc != null) { - opConstraint = pc; - } else if ((opInputs == null || opInputs.size() == 0) && finalPass) { - opConstraint = new AlgebricksCountPartitionConstraint(1); - } - } - if (opConstraint == null && finalPass) { - opConstraint = clusterLocations; - } + if (finalPass && opConstraint == null && (opInputs == null || opInputs.size() == 0)) { + opConstraint = new AlgebricksCountPartitionConstraint(1); } + if (finalPass && opConstraint == null) { + opConstraint = clusterLocations; + } + // Sets up the location constraint. if (opConstraint != null) { partitionConstraintMap.put(opDesc, opConstraint); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, opConstraint); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java index e76b486..154f4a1 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; @@ -37,8 +38,10 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor; +import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain; import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency; import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; @SuppressWarnings({ "unchecked", "rawtypes" }) public class AlgebricksOptimizationContext implements IOptimizationContext { @@ -77,20 +80,22 @@ protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<ILogicalOperator, ILogicalPropertiesVector>(); private final IExpressionTypeComputer expressionTypeComputer; private final INullableTypeComputer nullableTypeComputer; + private final INodeDomain defaultNodeDomain; private final LogicalOperatorPrettyPrintVisitor prettyPrintVisitor; public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer, IMergeAggregationExpressionFactory mergeAggregationExpressionFactory, IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer, - PhysicalOptimizationConfig physicalOptimizationConfig) { + PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations) { this(varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer, - nullableTypeComputer, physicalOptimizationConfig, new LogicalOperatorPrettyPrintVisitor()); + nullableTypeComputer, physicalOptimizationConfig, clusterLocations, + new LogicalOperatorPrettyPrintVisitor()); } public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer, IMergeAggregationExpressionFactory mergeAggregationExpressionFactory, IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer, - PhysicalOptimizationConfig physicalOptimizationConfig, + PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations, LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) { this.varCounter = varCounter; this.expressionEvalSizeComputer = expressionEvalSizeComputer; @@ -98,6 +103,7 @@ this.expressionTypeComputer = expressionTypeComputer; this.nullableTypeComputer = nullableTypeComputer; this.physicalOptimizationConfig = physicalOptimizationConfig; + this.defaultNodeDomain = new DefaultNodeGroupDomain(clusterLocations); this.prettyPrintVisitor = prettyPrintVisitor; } @@ -316,6 +322,11 @@ } @Override + public INodeDomain getComputationNodeDomain() { + return defaultNodeDomain; + } + + @Override public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor() { return prettyPrintVisitor; } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java index 996b988..ce77942 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.algebricks.core.rewriter.base; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer; import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer; @@ -29,5 +30,5 @@ IExpressionEvalSizeComputer expressionEvalSizeComputer, IMergeAggregationExpressionFactory mergeAggregationExpressionFactory, IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer, - PhysicalOptimizationConfig physicalOptimizationConfig); + PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations); } diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java index ae4f628..b5751fa 100644 --- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java +++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java @@ -29,7 +29,7 @@ import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; - +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder; @@ -145,13 +145,14 @@ } }); builder.setTypeTraitProvider(new ITypeTraitProvider() { + @Override public ITypeTraits getTypeTrait(Object type) { return null; } }); builder.setPrinterProvider(PigletPrinterFactoryProvider.INSTANCE); - builder.setExpressionRuntimeProvider(new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter( - new PigletExpressionJobGen())); + builder.setExpressionRuntimeProvider( + new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(new PigletExpressionJobGen())); builder.setExpressionTypeComputer(new IExpressionTypeComputer() { @Override public Object getType(ILogicalExpression expr, IMetadataProvider<?, ?> metadataProvider, @@ -159,6 +160,7 @@ return null; } }); + builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(new String[] { "nc1", "nc2" })); cFactory = builder.create(); metadataProvider = new PigletMetadataProvider(); } @@ -237,9 +239,8 @@ } PigletFileDataSource ds = new PigletFileDataSource(file, types.toArray()); rel.op = new DataSourceScanOperator(variables, ds); - rel.op.getInputs().add( - new MutableObject<ILogicalOperator>(previousOp == null ? new EmptyTupleSourceOperator() - : previousOp)); + rel.op.getInputs().add(new MutableObject<ILogicalOperator>( + previousOp == null ? new EmptyTupleSourceOperator() : previousOp)); return rel; } @@ -250,8 +251,9 @@ Relation inputRel = findInputRelation(alias, symMap); Pair<Relation, LogicalVariable> tempInput = translateScalarExpression(inputRel, conditionNode); Relation rel = new Relation(); - rel.op = new SelectOperator(new MutableObject<ILogicalExpression>(new VariableReferenceExpression( - tempInput.second)), false, null); + rel.op = new SelectOperator( + new MutableObject<ILogicalExpression>(new VariableReferenceExpression(tempInput.second)), false, + null); rel.op.getInputs().add(new MutableObject<ILogicalOperator>(tempInput.first.op)); rel.schema.putAll(tempInput.first.schema); return rel; @@ -298,7 +300,8 @@ for (ASTNode a : arguments) { Pair<Relation, LogicalVariable> argPair = translateScalarExpression(rel, (ExpressionNode) a); rel = argPair.first; - argExprs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(argPair.second))); + argExprs.add( + new MutableObject<ILogicalExpression>(new VariableReferenceExpression(argPair.second))); } Relation outRel = new Relation(); outRel.schema.putAll(rel.schema); diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java index 8bf1ad5..53f7dbf 100644 --- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java +++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java @@ -66,7 +66,6 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator; import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor; import org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter; -import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain; import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency; import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType; @@ -92,8 +91,6 @@ import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { - - private static final INodeDomain DEFAULT_DOMAIN = new DefaultNodeGroupDomain("__DEFAULT"); private PhysicalOptimizationConfig physicalOptimizationConfig; @@ -127,7 +124,8 @@ PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(op, context); - StructuralPropertiesVector pvector = new StructuralPropertiesVector(new RandomPartitioningProperty(null), + StructuralPropertiesVector pvector = new StructuralPropertiesVector( + new RandomPartitioningProperty(context.getComputationNodeDomain()), new LinkedList<ILocalStructuralProperty>()); boolean changed = physOptimizeOp(opRef, pvector, false, context); op.computeDeliveredPhysicalProperties(context); @@ -196,7 +194,7 @@ boolean changed = false; AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); optimizeUsingConstraintsAndEquivClasses(op); - PhysicalRequirements pr = op.getRequiredPhysicalPropertiesForChildren(required); + PhysicalRequirements pr = op.getRequiredPhysicalPropertiesForChildren(required, context); IPhysicalPropertiesVector[] reqdProperties = null; if (pr != null) { reqdProperties = pr.getRequiredProperties(); @@ -220,7 +218,7 @@ } else { INodeDomain dom2 = delivered.getPartitioningProperty().getNodeDomain(); if (!childrenDomain.sameAs(dom2)) { - childrenDomain = DEFAULT_DOMAIN; + childrenDomain = context.getComputationNodeDomain(); } } j++; @@ -443,7 +441,7 @@ .getDeliveredPhysicalProperties(); addPartitioningEnforcers(op, childIndex, pp, required, deliveredByNewChild, domain, context); } else { - addPartitioningEnforcers(op, childIndex, pp, required, deliveredByChild, domain, context); + addPartitioningEnforcers(op, childIndex, pp, required, deliveredByChild, pp.getNodeDomain(), context); AbstractLogicalOperator newChild = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue(); IPhysicalPropertiesVector newDiff = newPropertiesDiff(newChild, required, true, context); AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> New properties diff: " + newDiff + "\n"); -- To view, visit https://asterix-gerrit.ics.uci.edu/718 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ic21d8da2cd457aa17cc9861c0b92ac5960978e03 Gerrit-PatchSet: 7 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: Yingyi Bu <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]>
