Yingyi Bu has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/718
Change subject: ASTERIXDB-1343: support heterogeneity of computation nodes and
storage nodes.
......................................................................
ASTERIXDB-1343: support heterogeneity of computation nodes and storage nodes.
Change-Id: Ic21d8da2cd457aa17cc9861c0b92ac5960978e03
---
M
algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
M
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
M
algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
53 files changed, 195 insertions(+), 143 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/18/718/1
diff --git
a/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
b/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 0d899b9..1d3a55c 100644
---
a/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++
b/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.compiler.api;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -51,11 +52,11 @@
IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory
mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer,
INullableTypeComputer nullableTypeComputer,
- PhysicalOptimizationConfig physicalOptimizationConfig) {
+ PhysicalOptimizationConfig physicalOptimizationConfig,
AlgebricksPartitionConstraint clusterLocations) {
LogicalOperatorPrettyPrintVisitor prettyPrintVisitor = new
LogicalOperatorPrettyPrintVisitor();
return new AlgebricksOptimizationContext(varCounter,
expressionEvalSizeComputer,
mergeAggregationExpressionFactory, expressionTypeComputer,
nullableTypeComputer,
- physicalOptimizationConfig, prettyPrintVisitor);
+ physicalOptimizationConfig, clusterLocations,
prettyPrintVisitor);
}
}
@@ -77,7 +78,7 @@
int varCounter) {
final IOptimizationContext oc =
optCtxFactory.createOptimizationContext(varCounter,
expressionEvalSizeComputer,
mergeAggregationExpressionFactory, expressionTypeComputer,
- nullableTypeComputer, physicalOptimizationConfig);
+ nullableTypeComputer, physicalOptimizationConfig,
clusterLocations);
oc.setMetadataDeclarations(metadata);
final HeuristicOptimizer opt = new HeuristicOptimizer(plan,
logicalRewrites, physicalRewrites, oc);
return new ICompiler() {
@@ -92,13 +93,13 @@
IJobletEventListenerFactory
jobEventListenerFactory) throws AlgebricksException {
AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting Job
Generation.\n");
JobGenContext context = new JobGenContext(null,
metadata, appContext,
- serializerDeserializerProvider,
hashFunctionFactoryProvider,
- hashFunctionFamilyProvider,
comparatorFactoryProvider, typeTraitProvider,
- binaryBooleanInspectorFactory,
binaryIntegerInspectorFactory, printerProvider,
- nullWriterFactory,
normalizedKeyComputerFactoryProvider, expressionRuntimeProvider,
- expressionTypeComputer, nullableTypeComputer,
oc, expressionEvalSizeComputer,
- partialAggregationTypeComputer,
predEvaluatorFactoryProvider,
- physicalOptimizationConfig.getFrameSize(),
clusterLocations);
+ serializerDeserializerProvider,
hashFunctionFactoryProvider, hashFunctionFamilyProvider,
+ comparatorFactoryProvider, typeTraitProvider,
binaryBooleanInspectorFactory,
+ binaryIntegerInspectorFactory,
printerProvider, nullWriterFactory,
+ normalizedKeyComputerFactoryProvider,
expressionRuntimeProvider, expressionTypeComputer,
+ nullableTypeComputer, oc,
expressionEvalSizeComputer, partialAggregationTypeComputer,
+ predEvaluatorFactoryProvider,
physicalOptimizationConfig.getFrameSize(),
+ clusterLocations);
PlanCompiler pc = new PlanCompiler(context);
return pc.compilePlan(plan, null,
jobEventListenerFactory);
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index 9d68fae..707a7db 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -22,7 +22,6 @@
import java.util.Map;
import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
@@ -66,7 +65,7 @@
public void contributeRuntimeOperator(IHyracksJobBuilder builder,
JobGenContext context,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
- throws AlgebricksException;
+ throws AlgebricksException;
// variables
@@ -88,7 +87,8 @@
* @return for each child, one vector of required physical properties
*/
- public PhysicalRequirements
getRequiredPhysicalPropertiesForChildren(IPhysicalPropertiesVector
requiredProperties);
+ public PhysicalRequirements
getRequiredPhysicalPropertiesForChildren(IPhysicalPropertiesVector
requiredProperties,
+ IOptimizationContext context);
/**
* @return the physical properties that this operator delivers, based on
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 6badac7..328697d 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -28,6 +28,7 @@
import
org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
import
org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import
org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
@@ -35,52 +36,54 @@
public interface IOptimizationContext extends ITypingContext, IVariableContext
{
@Override
- public abstract IMetadataProvider<?, ?> getMetadataProvider();
+ public IMetadataProvider<?, ?> getMetadataProvider();
- public abstract void setMetadataDeclarations(IMetadataProvider<?, ?>
metadataProvider);
+ public void setMetadataDeclarations(IMetadataProvider<?, ?>
metadataProvider);
- public abstract boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule,
ILogicalOperator op);
+ public boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule,
ILogicalOperator op);
- public abstract void addToDontApplySet(IAlgebraicRewriteRule rule,
ILogicalOperator op);
+ public void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator
op);
/*
* returns true if op1 and op2 have already been compared
*/
- public abstract boolean checkAndAddToAlreadyCompared(ILogicalOperator op1,
ILogicalOperator op2);
+ public boolean checkAndAddToAlreadyCompared(ILogicalOperator op1,
ILogicalOperator op2);
- public abstract void removeFromAlreadyCompared(ILogicalOperator op1);
+ public void removeFromAlreadyCompared(ILogicalOperator op1);
- public abstract void addNotToBeInlinedVar(LogicalVariable var);
+ public void addNotToBeInlinedVar(LogicalVariable var);
- public abstract boolean shouldNotBeInlined(LogicalVariable var);
+ public boolean shouldNotBeInlined(LogicalVariable var);
- public abstract void addPrimaryKey(FunctionalDependency pk);
+ public void addPrimaryKey(FunctionalDependency pk);
- public abstract List<LogicalVariable> findPrimaryKey(LogicalVariable var);
+ public List<LogicalVariable> findPrimaryKey(LogicalVariable var);
- public abstract void putEquivalenceClassMap(ILogicalOperator op,
Map<LogicalVariable, EquivalenceClass> eqClassMap);
+ public void putEquivalenceClassMap(ILogicalOperator op,
Map<LogicalVariable, EquivalenceClass> eqClassMap);
- public abstract Map<LogicalVariable, EquivalenceClass>
getEquivalenceClassMap(ILogicalOperator op);
+ public Map<LogicalVariable, EquivalenceClass>
getEquivalenceClassMap(ILogicalOperator op);
- public abstract void putFDList(ILogicalOperator op,
List<FunctionalDependency> fdList);
+ public void putFDList(ILogicalOperator op, List<FunctionalDependency>
fdList);
- public abstract List<FunctionalDependency> getFDList(ILogicalOperator op);
+ public List<FunctionalDependency> getFDList(ILogicalOperator op);
public void clearAllFDAndEquivalenceClasses();
- public abstract void putLogicalPropertiesVector(ILogicalOperator op,
ILogicalPropertiesVector v);
+ public void putLogicalPropertiesVector(ILogicalOperator op,
ILogicalPropertiesVector v);
- public abstract ILogicalPropertiesVector
getLogicalPropertiesVector(ILogicalOperator op);
+ public ILogicalPropertiesVector
getLogicalPropertiesVector(ILogicalOperator op);
- public abstract IExpressionEvalSizeComputer
getExpressionEvalSizeComputer();
+ public IExpressionEvalSizeComputer getExpressionEvalSizeComputer();
- public abstract IVariableEvalSizeEnvironment
getVariableEvalSizeEnvironment();
+ public IVariableEvalSizeEnvironment getVariableEvalSizeEnvironment();
- public abstract IMergeAggregationExpressionFactory
getMergeAggregationExpressionFactory();
+ public IMergeAggregationExpressionFactory
getMergeAggregationExpressionFactory();
- public abstract PhysicalOptimizationConfig getPhysicalOptimizationConfig();
+ public PhysicalOptimizationConfig getPhysicalOptimizationConfig();
- public abstract void updatePrimaryKeys(Map<LogicalVariable,
LogicalVariable> mappedVars);
+ public void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable>
mappedVars);
- public abstract LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor();
+ public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor();
+
+ public INodeDomain getComputationNodeDomain();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
index a3ef5d9..8ae96f0 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
@@ -35,10 +35,13 @@
* @param reqdByParent
* parent's requirements, which are not enforced for now, as we
* only explore one plan
+ * @param context TODO
+ * @param context
+ * the optimization context
* @return for each child, one vector of required physical properties
*/
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent);
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context);
/**
* @return the physical properties that this operator delivers, based on
@@ -51,7 +54,7 @@
public void contributeRuntimeOperator(IHyracksJobBuilder builder,
JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
- throws AlgebricksException;
+ throws AlgebricksException;
public void disableJobGenBelowMe();
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 5e9eef6..2410ca0 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -102,8 +102,8 @@
@Override
public final PhysicalRequirements getRequiredPhysicalPropertiesForChildren(
- IPhysicalPropertiesVector requiredProperties) {
- return physicalOperator.getRequiredPropertiesForChildren(this,
requiredProperties);
+ IPhysicalPropertiesVector requiredProperties, IOptimizationContext
context) {
+ return physicalOperator.getRequiredPropertiesForChildren(this,
requiredProperties, context);
}
/**
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 09d2253..7077014 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -86,7 +86,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator iop,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
// In a cost-based optimizer, we would also try to propagate the
// parent's partitioning requirements.
@@ -97,12 +97,14 @@
if (op.getExecutionMode() ==
AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
switch (partitioningType) {
case PAIRWISE: {
- pp1 = new UnorderedPartitionedProperty(new
ListSet<LogicalVariable>(keysLeftBranch), null);
- pp2 = new UnorderedPartitionedProperty(new
ListSet<LogicalVariable>(keysRightBranch), null);
+ pp1 = new UnorderedPartitionedProperty(new
ListSet<LogicalVariable>(keysLeftBranch),
+ context.getComputationNodeDomain());
+ pp2 = new UnorderedPartitionedProperty(new
ListSet<LogicalVariable>(keysRightBranch),
+ context.getComputationNodeDomain());
break;
}
case BROADCAST: {
- pp2 = new BroadcastPartitioningProperty(null);
+ pp2 = new
BroadcastPartitioningProperty(context.getComputationNodeDomain());
break;
}
default: {
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 2c93cd4..2d00e4c 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -138,7 +138,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
List<ILocalStructuralProperty> localProps = null;
@@ -233,7 +233,8 @@
IPartitioningProperty pp = null;
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
- pp = new UnorderedPartitionedProperty(new
ListSet<LogicalVariable>(columnList), null);
+ pp = new UnorderedPartitionedProperty(new
ListSet<LogicalVariable>(columnList),
+ context.getComputationNodeDomain());
}
pv[0] = new StructuralPropertiesVector(pp, localProps);
return new PhysicalRequirements(pv,
IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
index 2ab24d2..5159ac5 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -26,7 +27,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index 4b13645..3759956 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -69,7 +69,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator iop,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
if (op.getExecutionMode() ==
AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
if (orderProp == null) {
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 2a2eb1e..3242fa0 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -72,7 +72,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
AggregateOperator aggOp = (AggregateOperator) op;
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
if (aggOp.isGlobal() && aggOp.getExecutionMode() ==
AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 5b6c40a..5aed63e 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -63,7 +63,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
index 03a8666..69bfe2a 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
@@ -61,7 +61,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
index 036ac05..62af599 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
@@ -69,7 +69,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
List<LogicalVariable> scanVariables = new ArrayList<>();
scanVariables.addAll(primaryKeys);
scanVariables.add(new LogicalVariable(-1));
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index 5823bc0..b3e8385 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -68,7 +68,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
DistributeResultOperator write = (DistributeResultOperator) op;
IDataSink sink = write.getDataSink();
IPartitioningProperty pp = sink.getPartitioningProperty();
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
index 4e3a5bc..dcb7e15 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
@@ -56,7 +56,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return null;
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index b15ea0b..7772620 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -135,12 +135,12 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
StructuralPropertiesVector[] pv = new
StructuralPropertiesVector[1];
- pv[0] = new StructuralPropertiesVector(
- new UnorderedPartitionedProperty(new
ListSet<LogicalVariable>(columnSet), null), null);
+ pv[0] = new StructuralPropertiesVector(new
UnorderedPartitionedProperty(
+ new ListSet<LogicalVariable>(columnSet),
context.getComputationNodeDomain()), null);
return new PhysicalRequirements(pv,
IPartitioningRequirementsCoordinator.NO_COORDINATION);
} else {
return emptyUnaryRequirements();
@@ -206,13 +206,16 @@
}
List<LogicalVariable> keyAndDecVariables = new
ArrayList<LogicalVariable>();
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p :
gby.getGroupByList())
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p :
gby.getGroupByList()) {
keyAndDecVariables.add(p.first);
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p :
gby.getDecorList())
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p :
gby.getDecorList()) {
keyAndDecVariables.add(GroupByOperator.getDecorVariable(p));
+ }
- for (LogicalVariable var : keyAndDecVariables)
+ for (LogicalVariable var : keyAndDecVariables) {
aggOpInputEnv.setVarType(var, outputEnv.getVarType(var));
+ }
compileSubplans(inputSchemas[0], gby, opSchema, context);
IOperatorDescriptorRegistry spec = builder.getJobSpec();
@@ -236,10 +239,12 @@
for (Object type : intermediateTypes) {
aggOpInputEnv.setVarType(usedVars.get(i++), type);
}
- for (LogicalVariable keyVar : keyAndDecVariables)
+ for (LogicalVariable keyVar : keyAndDecVariables) {
localInputSchemas[0].addVariable(keyVar);
- for (LogicalVariable usedVar : usedVars)
+ }
+ for (LogicalVariable usedVar : usedVars) {
localInputSchemas[0].addVariable(usedVar);
+ }
for (i = 0; i < n; i++) {
AggregateFunctionCallExpression mergeFun =
(AggregateFunctionCallExpression) aggOp.getMergeExpressions()
.get(i).getValue();
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
index 0ff1e47..34c707b 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
@@ -76,7 +76,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
index 21be272..17322b6 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
@@ -100,7 +100,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
List<ILocalStructuralProperty> orderProps = new
LinkedList<ILocalStructuralProperty>();
List<OrderColumn> columns = new ArrayList<OrderColumn>();
for (OrderColumn oc : orderColumns) {
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
index b837bfa..50ab6aa 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -83,7 +83,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
List<LogicalVariable> scanVariables = new ArrayList<>();
scanVariables.addAll(primaryKeys);
scanVariables.add(new LogicalVariable(-1));
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index 4702361..f29fd6f 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -87,7 +87,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
scanVariables.addAll(primaryKeys);
scanVariables.add(new LogicalVariable(-1));
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index d844f37..cdce9e3 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -80,7 +80,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
scanVariables.addAll(keys);
scanVariables.add(new LogicalVariable(-1));
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
index b6d0f1f..d5b493e 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
@@ -61,7 +61,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator iop,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
IntersectOperator intersectOp = (IntersectOperator) iop;
StructuralPropertiesVector[] pv = new
StructuralPropertiesVector[intersectOp.getNumInput()];
for (int i = 0; i < intersectOp.getNumInput(); i++) {
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
index a154d91..c55a4ae 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
@@ -55,7 +55,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index 7ff15d7..2622ae3 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -113,13 +113,14 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
if (partitioningType != JoinPartitioningType.BROADCAST) {
throw new NotImplementedException(partitioningType + " nested loop
joins are not implemented.");
}
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
- pv[0] = new StructuralPropertiesVector(new
BroadcastPartitioningProperty(null), null);
+ pv[0] = new StructuralPropertiesVector(new
BroadcastPartitioningProperty(context.getComputationNodeDomain()),
+ null);
pv[1] = new StructuralPropertiesVector(null, null);
return new PhysicalRequirements(pv,
IPartitioningRequirementsCoordinator.NO_COORDINATION);
}
@@ -225,10 +226,11 @@
}
boolean result =
binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(),
p.getLength());
- if (result)
+ if (result) {
return 0;
- else
+ } else {
return 1;
+ }
}
}
@@ -256,28 +258,31 @@
@Override
public byte[] getFieldData(int fIdx) {
int leftFieldCount = refLeft.getFieldCount();
- if (fIdx < leftFieldCount)
+ if (fIdx < leftFieldCount) {
return refLeft.getFieldData(fIdx);
- else
+ } else {
return refRight.getFieldData(fIdx - leftFieldCount);
+ }
}
@Override
public int getFieldStart(int fIdx) {
int leftFieldCount = refLeft.getFieldCount();
- if (fIdx < leftFieldCount)
+ if (fIdx < leftFieldCount) {
return refLeft.getFieldStart(fIdx);
- else
+ } else {
return refRight.getFieldStart(fIdx - leftFieldCount);
+ }
}
@Override
public int getFieldLength(int fIdx) {
int leftFieldCount = refLeft.getFieldCount();
- if (fIdx < leftFieldCount)
+ if (fIdx < leftFieldCount) {
return refLeft.getFieldLength(fIdx);
- else
+ } else {
return refRight.getFieldLength(fIdx - leftFieldCount);
+ }
}
@Override
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
index 7535c82..e8cc05f 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
@@ -89,7 +89,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return null;
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
index 27a5357..818e1ec 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
@@ -51,7 +51,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
index df65906..fe11f64 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
@@ -78,7 +78,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
List<ILocalStructuralProperty> localProps = new
ArrayList<ILocalStructuralProperty>();
List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
@@ -89,7 +89,8 @@
IPartitioningProperty pp = null;
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
- pp = new UnorderedPartitionedProperty(new
ListSet<LogicalVariable>(columnList), null);
+ pp = new UnorderedPartitionedProperty(new
ListSet<LogicalVariable>(columnList),
+ context.getComputationNodeDomain());
}
pv[0] = new StructuralPropertiesVector(pp, localProps);
return new PhysicalRequirements(pv,
IPartitioningRequirementsCoordinator.NO_COORDINATION);
@@ -98,7 +99,7 @@
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder,
JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
+ throws AlgebricksException {
IOperatorDescriptorRegistry spec = builder.getJobSpec();
int keys[] = JobGenHelper.variablesToFieldIndexes(columnList,
inputSchemas[0]);
@@ -119,11 +120,11 @@
keysAndDecs[i + keys.length] = fdColumns[i];
}
- IBinaryComparatorFactory[] comparatorFactories =
JobGenHelper.variablesToAscBinaryComparatorFactories(
- columnList, context.getTypeEnvironment(op), context);
+ IBinaryComparatorFactory[] comparatorFactories = JobGenHelper
+ .variablesToAscBinaryComparatorFactories(columnList,
context.getTypeEnvironment(op), context);
IAggregateEvaluatorFactory[] aggFactories = new
IAggregateEvaluatorFactory[] {};
- IAggregatorDescriptorFactory aggregatorFactory = new
SimpleAlgebricksAccumulatingAggregatorFactory(
- aggFactories, keysAndDecs);
+ IAggregatorDescriptorFactory aggregatorFactory = new
SimpleAlgebricksAccumulatingAggregatorFactory(aggFactories,
+ keysAndDecs);
RecordDescriptor recordDescriptor =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
context);
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
index bfe8b36..e11a64f 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
@@ -51,7 +51,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
index af0087d..7237d24 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
@@ -85,7 +85,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
index d47c31f..1f70f2a 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
@@ -105,7 +105,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
List<ILocalStructuralProperty> orderProps = new
LinkedList<ILocalStructuralProperty>();
List<OrderColumn> columns = new ArrayList<OrderColumn>();
for (OrderColumn oc : partitioningFields) {
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
index dca3d97..9046ce2 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
@@ -85,7 +85,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
index 19fb5d4..14a8f16 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
@@ -50,7 +50,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index 2c842e3..8e4ca18 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -63,7 +63,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
index 6246ad2..71acecf 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -75,7 +75,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements(op.getInputs().size());
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index 0b100ee..35f9444 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -67,7 +67,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
WriteOperator write = (WriteOperator) op;
IDataSink sink = write.getDataSink();
IPartitioningProperty pp = sink.getPartitioningProperty();
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
index 7465711..c08ff85 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -138,7 +138,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
index f264284..81f6e6b 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
@@ -121,7 +121,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
List<ILocalStructuralProperty> localProps = new
ArrayList<ILocalStructuralProperty>(sortColumns.length);
localProps.add(new LocalOrderProperty(Arrays.asList(sortColumns)));
StructuralPropertiesVector[] r = new StructuralPropertiesVector[] {
new StructuralPropertiesVector(null,
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
index f47e671..99be356 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
@@ -71,7 +71,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op;
if (limitOp.getExecutionMode() ==
AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
StructuralPropertiesVector[] pv = new
StructuralPropertiesVector[1];
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
index 326172f..184cbbc 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
@@ -49,7 +49,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
index 2c40b3b..9b35922 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
@@ -54,7 +54,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
index bd46230..1f5159d 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
@@ -49,7 +49,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
index aa5672e..9fc0dd4 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
@@ -77,7 +77,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
index 98427fe..557a657 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
@@ -66,7 +66,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
return emptyUnaryRequirements();
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
index 2c407b7..e6517d0 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -64,7 +64,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
StructuralPropertiesVector pv0 =
StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
StructuralPropertiesVector pv1 =
StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
return new PhysicalRequirements(new StructuralPropertiesVector[] {
pv0, pv1 },
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
index 9c8ddc3..7ec1914 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
@@ -80,7 +80,7 @@
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
scanVariables.addAll(keys);
scanVariables.add(new LogicalVariable(-1));
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
index 908dcc1..719c70e 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
@@ -18,26 +18,53 @@
*/
package org.apache.hyracks.algebricks.core.algebra.properties;
+import java.util.ArrayList;
+import java.util.List;
+
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
+
public class DefaultNodeGroupDomain implements INodeDomain {
- private String groupName;
+ private List<String> nodes = new ArrayList<>();
- public DefaultNodeGroupDomain(String groupName) {
- this.groupName = groupName;
+ public DefaultNodeGroupDomain(List<String> nodes) {
+ this.nodes.addAll(nodes);
+ }
+
+ public DefaultNodeGroupDomain(DefaultNodeGroupDomain domain) {
+ this.nodes.addAll(domain.nodes);
+ }
+
+ public DefaultNodeGroupDomain(AlgebricksPartitionConstraint
clusterLocations) {
+ if (clusterLocations.getPartitionConstraintType() ==
PartitionConstraintType.ABSOLUTE) {
+ AlgebricksAbsolutePartitionConstraint absPc =
(AlgebricksAbsolutePartitionConstraint) clusterLocations;
+ String[] locations = absPc.getLocations();
+ for (String location : locations) {
+ nodes.add(location);
+ }
+ } else {
+ throw new IllegalStateException("A node domain can only take
absolute location constraints.");
+ }
}
@Override
public boolean sameAs(INodeDomain domain) {
- return true;
+ if (!(domain instanceof DefaultNodeGroupDomain)) {
+ return false;
+ }
+ DefaultNodeGroupDomain nodeDomain = (DefaultNodeGroupDomain) domain;
+ return nodes.equals(nodeDomain.nodes);
}
@Override
public String toString() {
- return "AsterixDomain(" + groupName + ")";
+ return nodes.toString();
}
@Override
public Integer cardinality() {
- return null;
+ return nodes.size();
}
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
index d872491..e298691 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
@@ -31,7 +31,6 @@
public List<ILocalStructuralProperty> getLocalProperties();
/**
- *
* @param reqd
* vector of required properties
* @param equivalenceClasses
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index 711c0e7..0b8d759 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -55,9 +55,8 @@
return k;
}
- public static boolean matchLocalProperties(List<ILocalStructuralProperty>
reqd,
- List<ILocalStructuralProperty> dlvd, Map<LogicalVariable,
EquivalenceClass> equivalenceClasses,
- List<FunctionalDependency> fds) {
+ public static boolean matchLocalProperties(List<ILocalStructuralProperty>
reqd, List<ILocalStructuralProperty> dlvd,
+ Map<LogicalVariable, EquivalenceClass> equivalenceClasses,
List<FunctionalDependency> fds) {
if (reqd == null) {
return true;
}
@@ -118,7 +117,7 @@
boolean mayExpandProperties) {
INodeDomain dom1 = reqd.getNodeDomain();
INodeDomain dom2 = dlvd.getNodeDomain();
- if (dom1 != null && dom2 != null && !dom1.sameAs(dom2)) {
+ if (!dom1.sameAs(dom2)) {
return false;
}
@@ -132,10 +131,11 @@
case UNORDERED_PARTITIONED: {
UnorderedPartitionedProperty ur =
(UnorderedPartitionedProperty) reqd;
UnorderedPartitionedProperty ud =
(UnorderedPartitionedProperty) dlvd;
- if (mayExpandProperties)
+ if (mayExpandProperties) {
return (!ud.getColumnSet().isEmpty() &&
ur.getColumnSet().containsAll(ud.getColumnSet()));
- else
+ } else {
return
(ud.getColumnSet().equals(ur.getColumnSet()));
+ }
}
case ORDERED_PARTITIONED: {
UnorderedPartitionedProperty ur =
(UnorderedPartitionedProperty) reqd;
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java
index 7ae7e62..2cad1cb 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java
@@ -33,7 +33,8 @@
public static final StructuralPropertiesVector EMPTY_PROPERTIES_VECTOR =
new StructuralPropertiesVector(null,
new ArrayList<ILocalStructuralProperty>());
- public StructuralPropertiesVector(IPartitioningProperty propPartitioning,
List<ILocalStructuralProperty> propsLocal) {
+ public StructuralPropertiesVector(IPartitioningProperty propPartitioning,
+ List<ILocalStructuralProperty> propsLocal) {
this.propPartitioning = propPartitioning;
this.propsLocal = propsLocal;
}
@@ -63,7 +64,6 @@
}
/**
- *
* @param reqd
* vector of required properties
* @return a vector of properties from pvector that are not delivered by
the
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index ac2ae5c..4671a71 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -228,19 +228,13 @@
}
}
if (partitionConstraintMap.get(opDesc) == null) {
- if (opConstraint == null) {
- if (parentOp != null) {
- AlgebricksPartitionConstraint pc =
partitionConstraintMap.get(parentOp);
- if (pc != null) {
- opConstraint = pc;
- } else if ((opInputs == null || opInputs.size() == 0) &&
finalPass) {
- opConstraint = new
AlgebricksCountPartitionConstraint(1);
- }
- }
- if (opConstraint == null && finalPass) {
- opConstraint = clusterLocations;
- }
+ if (finalPass && opConstraint == null && (opInputs == null ||
opInputs.size() == 0)) {
+ opConstraint = new AlgebricksCountPartitionConstraint(1);
}
+ if (finalPass && opConstraint == null) {
+ opConstraint = clusterLocations;
+ }
+ // Sets up the location constraint.
if (opConstraint != null) {
partitionConstraintMap.put(opDesc, opConstraint);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec,
opDesc, opConstraint);
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index e76b486..154f4a1 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -37,8 +38,10 @@
import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import
org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
+import
org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
import
org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import
org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
@SuppressWarnings({ "unchecked", "rawtypes" })
public class AlgebricksOptimizationContext implements IOptimizationContext {
@@ -77,20 +80,22 @@
protected final Map<ILogicalOperator, ILogicalPropertiesVector>
logicalProps = new HashMap<ILogicalOperator, ILogicalPropertiesVector>();
private final IExpressionTypeComputer expressionTypeComputer;
private final INullableTypeComputer nullableTypeComputer;
+ private final INodeDomain defaultNodeDomain;
private final LogicalOperatorPrettyPrintVisitor prettyPrintVisitor;
public AlgebricksOptimizationContext(int varCounter,
IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory
mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer,
INullableTypeComputer nullableTypeComputer,
- PhysicalOptimizationConfig physicalOptimizationConfig) {
+ PhysicalOptimizationConfig physicalOptimizationConfig,
AlgebricksPartitionConstraint clusterLocations) {
this(varCounter, expressionEvalSizeComputer,
mergeAggregationExpressionFactory, expressionTypeComputer,
- nullableTypeComputer, physicalOptimizationConfig, new
LogicalOperatorPrettyPrintVisitor());
+ nullableTypeComputer, physicalOptimizationConfig,
clusterLocations,
+ new LogicalOperatorPrettyPrintVisitor());
}
public AlgebricksOptimizationContext(int varCounter,
IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory
mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer,
INullableTypeComputer nullableTypeComputer,
- PhysicalOptimizationConfig physicalOptimizationConfig,
+ PhysicalOptimizationConfig physicalOptimizationConfig,
AlgebricksPartitionConstraint clusterLocations,
LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
this.varCounter = varCounter;
this.expressionEvalSizeComputer = expressionEvalSizeComputer;
@@ -98,6 +103,7 @@
this.expressionTypeComputer = expressionTypeComputer;
this.nullableTypeComputer = nullableTypeComputer;
this.physicalOptimizationConfig = physicalOptimizationConfig;
+ this.defaultNodeDomain = new DefaultNodeGroupDomain(clusterLocations);
this.prettyPrintVisitor = prettyPrintVisitor;
}
@@ -316,6 +322,11 @@
}
@Override
+ public INodeDomain getComputationNodeDomain() {
+ return defaultNodeDomain;
+ }
+
+ @Override
public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor() {
return prettyPrintVisitor;
}
diff --git
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
index 996b988..ce77942 100644
---
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
+++
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.core.rewriter.base;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
import
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
@@ -29,5 +30,5 @@
IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory
mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer,
INullableTypeComputer nullableTypeComputer,
- PhysicalOptimizationConfig physicalOptimizationConfig);
+ PhysicalOptimizationConfig physicalOptimizationConfig,
AlgebricksPartitionConstraint clusterLocations);
}
diff --git
a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 8bf1ad5..53f7dbf 100644
---
a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++
b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -66,7 +66,6 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
import
org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
import
org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
-import
org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
import
org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
@@ -92,8 +91,6 @@
import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
-
- private static final INodeDomain DEFAULT_DOMAIN = new
DefaultNodeGroupDomain("__DEFAULT");
private PhysicalOptimizationConfig physicalOptimizationConfig;
@@ -127,7 +124,8 @@
PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(op, context);
- StructuralPropertiesVector pvector = new
StructuralPropertiesVector(new RandomPartitioningProperty(null),
+ StructuralPropertiesVector pvector = new StructuralPropertiesVector(
+ new
RandomPartitioningProperty(context.getComputationNodeDomain()),
new LinkedList<ILocalStructuralProperty>());
boolean changed = physOptimizeOp(opRef, pvector, false, context);
op.computeDeliveredPhysicalProperties(context);
@@ -196,7 +194,7 @@
boolean changed = false;
AbstractLogicalOperator op = (AbstractLogicalOperator)
opRef.getValue();
optimizeUsingConstraintsAndEquivClasses(op);
- PhysicalRequirements pr =
op.getRequiredPhysicalPropertiesForChildren(required);
+ PhysicalRequirements pr =
op.getRequiredPhysicalPropertiesForChildren(required, context);
IPhysicalPropertiesVector[] reqdProperties = null;
if (pr != null) {
reqdProperties = pr.getRequiredProperties();
@@ -220,7 +218,7 @@
} else {
INodeDomain dom2 =
delivered.getPartitioningProperty().getNodeDomain();
if (!childrenDomain.sameAs(dom2)) {
- childrenDomain = DEFAULT_DOMAIN;
+ childrenDomain = context.getComputationNodeDomain();
}
}
j++;
@@ -443,7 +441,7 @@
.getDeliveredPhysicalProperties();
addPartitioningEnforcers(op, childIndex, pp, required,
deliveredByNewChild, domain, context);
} else {
- addPartitioningEnforcers(op, childIndex, pp, required,
deliveredByChild, domain, context);
+ addPartitioningEnforcers(op, childIndex, pp, required,
deliveredByChild, pp.getNodeDomain(), context);
AbstractLogicalOperator newChild = (AbstractLogicalOperator)
op.getInputs().get(childIndex).getValue();
IPhysicalPropertiesVector newDiff = newPropertiesDiff(newChild,
required, true, context);
AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> New properties
diff: " + newDiff + "\n");
--
To view, visit https://asterix-gerrit.ics.uci.edu/718
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic21d8da2cd457aa17cc9861c0b92ac5960978e03
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>