http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java index 85f1aa7..e4b35b7 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java @@ -38,7 +38,7 @@ import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.costs.CostEstimator; import org.apache.flink.optimizer.costs.Costs; import org.apache.flink.optimizer.dataproperties.GlobalProperties; @@ -105,8 +105,8 @@ public class DataSourceNode extends OptimizerNode { * @return The contract. */ @Override - public GenericDataSourceBase<?, ?> getPactContract() { - return (GenericDataSourceBase<?, ?>) super.getPactContract(); + public GenericDataSourceBase<?, ?> getOperator() { + return (GenericDataSourceBase<?, ?>) super.getOperator(); } @Override @@ -123,8 +123,8 @@ public class DataSourceNode extends OptimizerNode { } @Override - public List<PactConnection> getIncomingConnections() { - return Collections.<PactConnection>emptyList(); + public List<DagConnection> getIncomingConnections() { + return Collections.<DagConnection>emptyList(); } @Override @@ -139,13 +139,13 @@ public class DataSourceNode extends OptimizerNode { String inFormatDescription = "<unknown>"; try { - format = getPactContract().getFormatWrapper().getUserCodeObject(); - Configuration config = getPactContract().getParameters(); + format = getOperator().getFormatWrapper().getUserCodeObject(); + Configuration config = getOperator().getParameters(); format.configure(config); } catch (Throwable t) { - if (PactCompiler.LOG.isWarnEnabled()) { - PactCompiler.LOG.warn("Could not instantiate InputFormat to obtain statistics." + if (Optimizer.LOG.isWarnEnabled()) { + Optimizer.LOG.warn("Could not instantiate InputFormat to obtain statistics." + " Limited statistics will be available.", t); } return; @@ -158,7 +158,7 @@ public class DataSourceNode extends OptimizerNode { } // first of all, get the statistics from the cache - final String statisticsKey = getPactContract().getStatisticsKey(); + final String statisticsKey = getOperator().getStatisticsKey(); final BaseStatistics cachedStatistics = statistics.getBaseStatistics(statisticsKey); BaseStatistics bs = null; @@ -166,16 +166,16 @@ public class DataSourceNode extends OptimizerNode { bs = format.getStatistics(cachedStatistics); } catch (Throwable t) { - if (PactCompiler.LOG.isWarnEnabled()) { - PactCompiler.LOG.warn("Error obtaining statistics from input format: " + t.getMessage(), t); + if (Optimizer.LOG.isWarnEnabled()) { + Optimizer.LOG.warn("Error obtaining statistics from input format: " + t.getMessage(), t); } } if (bs != null) { final long len = bs.getTotalInputSize(); if (len == BaseStatistics.SIZE_UNKNOWN) { - if (PactCompiler.LOG.isInfoEnabled()) { - PactCompiler.LOG.info("Compiler could not determine the size of input '" + inFormatDescription + "'. Using default estimates."); + if (Optimizer.LOG.isInfoEnabled()) { + Optimizer.LOG.info("Compiler could not determine the size of input '" + inFormatDescription + "'. Using default estimates."); } } else if (len >= 0) { @@ -207,14 +207,14 @@ public class DataSourceNode extends OptimizerNode { return this.cachedPlans; } - SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getPactContract().getName()+")", + SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getOperator().getName()+")", this.gprops, this.lprops); if(!replicatedInput) { candidate.updatePropertiesWithUniqueSets(getUniqueFields()); final Costs costs = new Costs(); - if (FileInputFormat.class.isAssignableFrom(getPactContract().getFormatWrapper().getUserCodeClass()) && + if (FileInputFormat.class.isAssignableFrom(getOperator().getFormatWrapper().getUserCodeClass()) && this.estimatedOutputSize >= 0) { estimator.addFileInputCost(this.estimatedOutputSize, costs); } @@ -223,10 +223,10 @@ public class DataSourceNode extends OptimizerNode { // replicated input final Costs costs = new Costs(); InputFormat<?,?> inputFormat = - ((ReplicatingInputFormat<?,?>)getPactContract().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat(); + ((ReplicatingInputFormat<?,?>) getOperator().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat(); if (FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) && this.estimatedOutputSize >= 0) { - estimator.addFileInputCost(this.estimatedOutputSize * this.getDegreeOfParallelism(), costs); + estimator.addFileInputCost(this.estimatedOutputSize * this.getParallelism(), costs); } candidate.setCosts(costs); }
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java index 5370678..482951b 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java @@ -16,10 +16,12 @@ * limitations under the License. */ - package org.apache.flink.optimizer.dag; - +/** + * Methods for operators / connections that provide estimated about data size and + * characteristics. + */ public interface EstimateProvider { /** http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java index c8dda12..118ddc8 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java @@ -41,8 +41,8 @@ public class FilterNode extends SingleInputNode { } @Override - public FilterOperatorBase<?, ?> getPactContract() { - return (FilterOperatorBase<?, ?>) super.getPactContract(); + public FilterOperatorBase<?, ?> getOperator() { + return (FilterOperatorBase<?, ?>) super.getOperator(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java index bb5799c..f713d56 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java @@ -40,8 +40,8 @@ public class FlatMapNode extends SingleInputNode { } @Override - public FlatMapOperatorBase<?, ?, ?> getPactContract() { - return (FlatMapOperatorBase<?, ?, ?>) super.getPactContract(); + public FlatMapOperatorBase<?, ?, ?> getOperator() { + return (FlatMapOperatorBase<?, ?, ?>) super.getOperator(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java index 2e2576d..564c0d3 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java @@ -54,7 +54,7 @@ public class GroupCombineNode extends SingleInputNode { private List<OperatorDescriptorSingle> initPossibleProperties() { // check if we can work with a grouping (simple reducer), or if we need ordering because of a group order - Ordering groupOrder = getPactContract().getGroupOrder(); + Ordering groupOrder = getOperator().getGroupOrder(); if (groupOrder != null && groupOrder.getNumberOfFields() == 0) { groupOrder = null; } @@ -74,8 +74,8 @@ public class GroupCombineNode extends SingleInputNode { * @return The operator represented by this optimizer node. */ @Override - public GroupCombineOperatorBase<?, ?, ?> getPactContract() { - return (GroupCombineOperatorBase<?, ?, ?>) super.getPactContract(); + public GroupCombineOperatorBase<?, ?, ?> getOperator() { + return (GroupCombineOperatorBase<?, ?, ?>) super.getOperator(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java index 88ec53a..77acae5 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java @@ -26,7 +26,7 @@ import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.operators.AllGroupReduceProperties; import org.apache.flink.optimizer.operators.AllGroupWithPartialPreGroupProperties; import org.apache.flink.optimizer.operators.GroupReduceProperties; @@ -67,17 +67,17 @@ public class GroupReduceNode extends SingleInputNode { private List<OperatorDescriptorSingle> initPossibleProperties(Partitioner<?> customPartitioner) { // see if an internal hint dictates the strategy to use - final Configuration conf = getPactContract().getParameters(); - final String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null); + final Configuration conf = getOperator().getParameters(); + final String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null); final boolean useCombiner; if (localStrategy != null) { - if (PactCompiler.HINT_LOCAL_STRATEGY_SORT.equals(localStrategy)) { + if (Optimizer.HINT_LOCAL_STRATEGY_SORT.equals(localStrategy)) { useCombiner = false; } - else if (PactCompiler.HINT_LOCAL_STRATEGY_COMBINING_SORT.equals(localStrategy)) { + else if (Optimizer.HINT_LOCAL_STRATEGY_COMBINING_SORT.equals(localStrategy)) { if (!isCombineable()) { - PactCompiler.LOG.warn("Strategy hint for GroupReduce '" + getPactContract().getName() + + Optimizer.LOG.warn("Strategy hint for GroupReduce '" + getOperator().getName() + "' requires combinable reduce, but user function is not marked combinable."); } useCombiner = true; @@ -90,8 +90,8 @@ public class GroupReduceNode extends SingleInputNode { // check if we can work with a grouping (simple reducer), or if we need ordering because of a group order Ordering groupOrder = null; - if (getPactContract() instanceof GroupReduceOperatorBase) { - groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) getPactContract()).getGroupOrder(); + if (getOperator() instanceof GroupReduceOperatorBase) { + groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) getOperator()).getGroupOrder(); if (groupOrder != null && groupOrder.getNumberOfFields() == 0) { groupOrder = null; } @@ -112,8 +112,8 @@ public class GroupReduceNode extends SingleInputNode { * @return The operator represented by this optimizer node. */ @Override - public GroupReduceOperatorBase<?, ?, ?> getPactContract() { - return (GroupReduceOperatorBase<?, ?, ?>) super.getPactContract(); + public GroupReduceOperatorBase<?, ?, ?> getOperator() { + return (GroupReduceOperatorBase<?, ?, ?>) super.getOperator(); } /** @@ -123,7 +123,7 @@ public class GroupReduceNode extends SingleInputNode { * @return True, if a combiner has been given, false otherwise. */ public boolean isCombineable() { - return getPactContract().isCombinable(); + return getOperator().isCombinable(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java index 3c67108..cbd58ca 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java @@ -27,7 +27,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.operators.AbstractJoinDescriptor; import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties; import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties; @@ -62,8 +62,8 @@ public class JoinNode extends TwoInputNode { * @return The contract. */ @Override - public JoinOperatorBase<?, ?, ?, ?> getPactContract() { - return (JoinOperatorBase<?, ?, ?, ?>) super.getPactContract(); + public JoinOperatorBase<?, ?, ?, ?> getOperator() { + return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator(); } @Override @@ -116,21 +116,21 @@ public class JoinNode extends TwoInputNode { { // see if an internal hint dictates the strategy to use Configuration conf = joinOperatorBase.getParameters(); - String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null); + String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null); if (localStrategy != null) { final AbstractJoinDescriptor fixedDriverStrat; - if (PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) || - PactCompiler.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) || - PactCompiler.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) || - PactCompiler.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) ) + if (Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) || + Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) || + Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) || + Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) ) { fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2); } - else if (PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) { + else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) { fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2); } - else if (PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) { + else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) { fixedDriverStrat = new HashJoinBuildSecondProperties(this.keys1, this.keys2); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java index cbcab2b..de3cd22 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java @@ -26,7 +26,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties; import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties; import org.apache.flink.optimizer.operators.OperatorDescriptorDual; @@ -58,8 +58,8 @@ public class MatchNode extends TwoInputNode { * @return The contract. */ @Override - public JoinOperatorBase<?, ?, ?, ?> getPactContract() { - return (JoinOperatorBase<?, ?, ?, ?>) super.getPactContract(); + public JoinOperatorBase<?, ?, ?, ?> getOperator() { + return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator(); } @Override @@ -110,19 +110,19 @@ public class MatchNode extends TwoInputNode { private List<OperatorDescriptorDual> getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint) { // see if an internal hint dictates the strategy to use Configuration conf = joinOperatorBase.getParameters(); - String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null); + String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null); if (localStrategy != null) { final OperatorDescriptorDual fixedDriverStrat; - if (PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) || - PactCompiler.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) || - PactCompiler.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) || - PactCompiler.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) ) + if (Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) || + Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) || + Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) || + Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) ) { fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2); - } else if (PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) { + } else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) { fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2); - } else if (PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) { + } else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) { fixedDriverStrat = new HashJoinBuildSecondProperties(this.keys1, this.keys2); } else { throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java index baeb1f7..0cad34e 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.optimizer.dag; import java.util.ArrayList; @@ -48,8 +47,19 @@ import org.apache.flink.util.Visitable; import org.apache.flink.util.Visitor; /** - * This class represents a node in the optimizer's internal representation of the PACT plan. It contains - * extra information about estimates, hints and data properties. + * The OptimizerNode is the base class of all nodes in the optimizer DAG. The optimizer DAG is the + * optimizer's representation of a program, created before the actual optimization (which creates different + * candidate plans and computes their cost). + * <p>> + * Nodes in the DAG correspond (almost) one-to-one to the operators in a program. The optimizer DAG is constructed + * to hold the additional information that the optimizer needs: + * <ul> + * <li>Estimates of the data size processed by each operator</li> + * <li>Helper structures to track where the data flow "splits" and "joins", to support flows that are + * DAGs but not trees.</li> + * <li>Tags and weights to differentiate between loop-variant and -invariant parts of an iteration</li> + * <li>Interesting properties to be used during the enumeration of candidate plans</li> + * </ul> */ public abstract class OptimizerNode implements Visitable<OptimizerNode>, EstimateProvider, DumpableNode<OptimizerNode> { @@ -59,13 +69,13 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat // Members // -------------------------------------------------------------------------------------------- - private final Operator<?> pactContract; // The operator (Reduce / Join / DataSource / ...) + private final Operator<?> operator; // The operator (Reduce / Join / DataSource / ...) private List<String> broadcastConnectionNames = new ArrayList<String>(); // the broadcast inputs names of this node - private List<PactConnection> broadcastConnections = new ArrayList<PactConnection>(); // the broadcast inputs of this node + private List<DagConnection> broadcastConnections = new ArrayList<DagConnection>(); // the broadcast inputs of this node - private List<PactConnection> outgoingConnections; // The links to succeeding nodes + private List<DagConnection> outgoingConnections; // The links to succeeding nodes private InterestingProperties intProps; // the interesting properties of this node @@ -88,7 +98,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat // --------------------------------- General Parameters --------------------------------------- - private int degreeOfParallelism = -1; // the number of parallel instances of this node + private int parallelism = -1; // the number of parallel instances of this node private long minimalMemoryPerSubTask = -1; @@ -105,18 +115,17 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat // ------------------------------------------------------------------------ /** - * Creates a new node for the optimizer plan. + * Creates a new optimizer node that represents the given program operator. * * @param op The operator that the node represents. */ public OptimizerNode(Operator<?> op) { - this.pactContract = op; + this.operator = op; readStubAnnotations(); } protected OptimizerNode(OptimizerNode toCopy) { - this.pactContract = toCopy.pactContract; - + this.operator = toCopy.operator; this.intProps = toCopy.intProps; this.openBranches = toCopy.openBranches; @@ -125,7 +134,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat this.estimatedOutputSize = toCopy.estimatedOutputSize; this.estimatedNumRecords = toCopy.estimatedNumRecords; - this.degreeOfParallelism = toCopy.degreeOfParallelism; + this.parallelism = toCopy.parallelism; this.minimalMemoryPerSubTask = toCopy.minimalMemoryPerSubTask; this.id = toCopy.id; @@ -156,7 +165,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat ExecutionMode defaultExchangeMode); /** - * This function connects the predecessors to this operator. + * This function connects the operators that produce the broadcast inputs to this operator. * * @param operatorToNode The map from program operators to optimizer nodes. * @param defaultExchangeMode The data exchange mode to use, if the operator does not @@ -164,21 +173,19 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat * * @throws CompilerException */ - public void setBroadcastInputs(Map<Operator<?>, OptimizerNode> operatorToNode, ExecutionMode defaultExchangeMode) - throws CompilerException - { + public void setBroadcastInputs(Map<Operator<?>, OptimizerNode> operatorToNode, ExecutionMode defaultExchangeMode) { // skip for Operators that don't support broadcast variables - if (!(getPactContract() instanceof AbstractUdfOperator<?, ?>)) { + if (!(getOperator() instanceof AbstractUdfOperator<?, ?>)) { return; } // get all broadcast inputs - AbstractUdfOperator<?, ?> operator = ((AbstractUdfOperator<?, ?>) getPactContract()); + AbstractUdfOperator<?, ?> operator = ((AbstractUdfOperator<?, ?>) getOperator()); // create connections and add them for (Map.Entry<String, Operator<?>> input : operator.getBroadcastInputs().entrySet()) { OptimizerNode predecessor = operatorToNode.get(input.getValue()); - PactConnection connection = new PactConnection(predecessor, this, + DagConnection connection = new DagConnection(predecessor, this, ShipStrategyType.BROADCAST, defaultExchangeMode); addBroadcastConnection(input.getKey(), connection); predecessor.addOutgoingConnection(connection); @@ -186,11 +193,12 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat } /** + * Gets all incoming connections of this node. * This method needs to be overridden by subclasses to return the children. * * @return The list of incoming connections. */ - public abstract List<PactConnection> getIncomingConnections(); + public abstract List<DagConnection> getIncomingConnections(); /** * Tells the node to compute the interesting properties for its inputs. The interesting properties @@ -215,7 +223,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat List<UnclosedBranchDescriptor> branchesSoFar) { // handle the data flow branching for the broadcast inputs - for (PactConnection broadcastInput : getBroadcastConnections()) { + for (DagConnection broadcastInput : getBroadcastConnections()) { OptimizerNode bcSource = broadcastInput.getSource(); addClosedBranches(bcSource.closedBranchingNodes); @@ -263,11 +271,11 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat public Iterable<OptimizerNode> getPredecessors() { List<OptimizerNode> allPredecessors = new ArrayList<OptimizerNode>(); - for (PactConnection pactConnection : getIncomingConnections()) { - allPredecessors.add(pactConnection.getSource()); + for (DagConnection dagConnection : getIncomingConnections()) { + allPredecessors.add(dagConnection.getSource()); } - for (PactConnection conn : getBroadcastConnections()) { + for (DagConnection conn : getBroadcastConnections()) { allPredecessors.add(conn.getSource()); } @@ -306,7 +314,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat * * @param broadcastConnection The connection to add. */ - public void addBroadcastConnection(String name, PactConnection broadcastConnection) { + public void addBroadcastConnection(String name, DagConnection broadcastConnection) { this.broadcastConnectionNames.add(name); this.broadcastConnections.add(broadcastConnection); } @@ -321,26 +329,26 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat /** * Return the list of inputs associated with broadcast variables for this node. */ - public List<PactConnection> getBroadcastConnections() { + public List<DagConnection> getBroadcastConnections() { return this.broadcastConnections; } /** * Adds a new outgoing connection to this node. * - * @param pactConnection + * @param connection * The connection to add. */ - public void addOutgoingConnection(PactConnection pactConnection) { + public void addOutgoingConnection(DagConnection connection) { if (this.outgoingConnections == null) { - this.outgoingConnections = new ArrayList<PactConnection>(); + this.outgoingConnections = new ArrayList<DagConnection>(); } else { if (this.outgoingConnections.size() == 64) { throw new CompilerException("Cannot currently handle nodes with more than 64 outputs."); } } - this.outgoingConnections.add(pactConnection); + this.outgoingConnections.add(connection); } /** @@ -348,7 +356,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat * * @return The list of outgoing connections. */ - public List<PactConnection> getOutgoingConnections() { + public List<DagConnection> getOutgoingConnections() { return this.outgoingConnections; } @@ -357,8 +365,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat * * @return This node's operator. */ - public Operator<?> getPactContract() { - return this.pactContract; + public Operator<?> getOperator() { + return this.operator; } /** @@ -369,8 +377,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat * * @return The parallelism of the operator. */ - public int getDegreeOfParallelism() { - return this.degreeOfParallelism; + public int getParallelism() { + return this.parallelism; } /** @@ -386,7 +394,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat if (parallelism < 1 && parallelism != -1) { throw new IllegalArgumentException("Degree of parallelism of " + parallelism + " is invalid."); } - this.degreeOfParallelism = parallelism; + this.parallelism = parallelism; } /** @@ -395,7 +403,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat * @return The total amount of memory across all subtasks. */ public long getMinimalMemoryAcrossAllSubTasks() { - return this.minimalMemoryPerSubTask == -1 ? -1 : this.minimalMemoryPerSubTask * this.degreeOfParallelism; + return this.minimalMemoryPerSubTask == -1 ? -1 : this.minimalMemoryPerSubTask * this.parallelism; } public boolean isOnDynamicPath() { @@ -406,13 +414,13 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat boolean anyDynamic = false; boolean allDynamic = true; - for (PactConnection conn : getIncomingConnections()) { + for (DagConnection conn : getIncomingConnections()) { boolean dynamicIn = conn.isOnDynamicPath(); anyDynamic |= dynamicIn; allDynamic &= dynamicIn; } - for (PactConnection conn : getBroadcastConnections()) { + for (DagConnection conn : getBroadcastConnections()) { boolean dynamicIn = conn.isOnDynamicPath(); anyDynamic |= dynamicIn; allDynamic &= dynamicIn; @@ -424,7 +432,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat if (!allDynamic) { // this node joins static and dynamic path. // mark the connections where the source is not dynamic as cached - for (PactConnection conn : getIncomingConnections()) { + for (DagConnection conn : getIncomingConnections()) { if (!conn.getSource().isOnDynamicPath()) { conn.setMaterializationMode(conn.getMaterializationMode().makeCached()); } @@ -442,10 +450,10 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat public int getMaxDepth() { int maxDepth = 0; - for (PactConnection conn : getIncomingConnections()) { + for (DagConnection conn : getIncomingConnections()) { maxDepth = Math.max(maxDepth, conn.getMaxDepth()); } - for (PactConnection conn : getBroadcastConnections()) { + for (DagConnection conn : getBroadcastConnections()) { maxDepth = Math.max(maxDepth, conn.getMaxDepth()); } @@ -502,7 +510,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat if (this.outgoingConnections == null) { throw new IllegalStateException("The outgoing connections have not yet been initialized."); } - for (PactConnection conn : getOutgoingConnections()) { + for (DagConnection conn : getOutgoingConnections()) { conn.markBreaksPipeline(); } } @@ -517,7 +525,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat * @return True, if on all outgoing connections, the interesting properties are set. False otherwise. */ public boolean haveAllOutputConnectionInterestingProperties() { - for (PactConnection conn : getOutgoingConnections()) { + for (DagConnection conn : getOutgoingConnections()) { if (conn.getInterestingProperties() == null) { return false; } @@ -535,7 +543,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat * leaves the original objects, contained by the connections, unchanged. */ public void computeUnionOfInterestingPropertiesFromSuccessors() { - List<PactConnection> conns = getOutgoingConnections(); + List<DagConnection> conns = getOutgoingConnections(); if (conns.size() == 0) { // no incoming, we have none ourselves this.intProps = new InterestingProperties(); @@ -550,10 +558,10 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat public void clearInterestingProperties() { this.intProps = null; - for (PactConnection conn : getIncomingConnections()) { + for (DagConnection conn : getIncomingConnections()) { conn.clearInterestingProperties(); } - for (PactConnection conn : getBroadcastConnections()) { + for (DagConnection conn : getBroadcastConnections()) { conn.clearInterestingProperties(); } } @@ -570,7 +578,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat */ public void computeOutputEstimates(DataStatistics statistics) { // sanity checking - for (PactConnection c : getIncomingConnections()) { + for (DagConnection c : getIncomingConnections()) { if (c.getSource() == null) { throw new CompilerException("Bug: Estimate computation called before inputs have been set."); } @@ -587,11 +595,11 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat } // overwrite default estimates with hints, if given - if (getPactContract() == null || getPactContract().getCompilerHints() == null) { + if (getOperator() == null || getOperator().getCompilerHints() == null) { return ; } - CompilerHints hints = getPactContract().getCompilerHints(); + CompilerHints hints = getOperator().getCompilerHints(); if (hints.getOutputSize() >= 0) { this.estimatedOutputSize = hints.getOutputSize(); } @@ -643,8 +651,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat } protected void readUniqueFieldsAnnotation() { - if (this.pactContract.getCompilerHints() != null) { - Set<FieldSet> uniqueFieldSets = pactContract.getCompilerHints().getUniqueFields(); + if (this.operator.getCompilerHints() != null) { + Set<FieldSet> uniqueFieldSets = operator.getCompilerHints().getUniqueFields(); if (uniqueFieldSets != null) { if (this.uniqueFields == null) { this.uniqueFields = new HashSet<FieldSet>(); @@ -686,7 +694,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat // for pruning, we are quasi AFTER the node, so in the presence of // branches, we need form the per-branch-choice groups by the choice - // they made at the latest unjoined branching node. Note that this is + // they made at the latest un-joined branching node. Note that this is // different from the check for branch compatibility of candidates, as // this happens on the input sub-plans and hence BEFORE the node (therefore // it is relevant to find the latest (partially) joined branch point. @@ -708,12 +716,12 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat @Override public int compare(PlanNode o1, PlanNode o2) { - for (int i = 0; i < branchDeterminers.length; i++) { - PlanNode n1 = o1.getCandidateAtBranchPoint(branchDeterminers[i]); - PlanNode n2 = o2.getCandidateAtBranchPoint(branchDeterminers[i]); + for (OptimizerNode branchDeterminer : branchDeterminers) { + PlanNode n1 = o1.getCandidateAtBranchPoint(branchDeterminer); + PlanNode n2 = o2.getCandidateAtBranchPoint(branchDeterminer); int hash1 = System.identityHashCode(n1); int hash2 = System.identityHashCode(n2); - + if (hash1 != hash2) { return hash1 - hash2; } @@ -775,8 +783,10 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat protected void prunePlanAlternativesWithCommonBranching(List<PlanNode> plans) { // for each interesting property, which plans are cheapest - final RequestedGlobalProperties[] gps = (RequestedGlobalProperties[]) this.intProps.getGlobalProperties().toArray(new RequestedGlobalProperties[this.intProps.getGlobalProperties().size()]); - final RequestedLocalProperties[] lps = (RequestedLocalProperties[]) this.intProps.getLocalProperties().toArray(new RequestedLocalProperties[this.intProps.getLocalProperties().size()]); + final RequestedGlobalProperties[] gps = this.intProps.getGlobalProperties().toArray( + new RequestedGlobalProperties[this.intProps.getGlobalProperties().size()]); + final RequestedLocalProperties[] lps = this.intProps.getLocalProperties().toArray( + new RequestedLocalProperties[this.intProps.getLocalProperties().size()]); final PlanNode[][] toKeep = new PlanNode[gps.length][]; final PlanNode[] cheapestForGlobal = new PlanNode[gps.length]; @@ -831,14 +841,12 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat plans.add(cheapest); cheapest.setPruningMarker(); // remember that that plan is in the set } - - // skip the top down delta cost check for now (TODO: implement this) + // add all others, which are optimal for some interesting properties for (int i = 0; i < gps.length; i++) { if (toKeep[i] != null) { final PlanNode[] localMatches = toKeep[i]; - for (int k = 0; k < localMatches.length; k++) { - final PlanNode n = localMatches[k]; + for (final PlanNode n : localMatches) { if (n != null && !n.isPruneMarkerSet()) { n.setPruningMarker(); plans.add(n); @@ -873,7 +881,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat } - protected List<UnclosedBranchDescriptor> getBranchesForParent(PactConnection toParent) { + protected List<UnclosedBranchDescriptor> getBranchesForParent(DagConnection toParent) { if (this.outgoingConnections.size() == 1) { // return our own stack of open branches, because nothing is added if (this.openBranches == null || this.openBranches.isEmpty()) { @@ -954,8 +962,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat * a) There is no branch in the sub-plan of this node * b) Both candidates have the same candidate as the child at the last open branch. * - * @param plan1 - * @param plan2 + * @param plan1 The root node of the first candidate plan. + * @param plan2 The root node of the second candidate plan. * @return True if the nodes are branch compatible in the inputs. */ protected boolean areBranchCompatible(PlanNode plan1, PlanNode plan2) { @@ -1086,39 +1094,6 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat Collections.reverse(result); return didCloseABranch; } - - /** - * - */ - public static final class UnclosedBranchDescriptor - { - protected OptimizerNode branchingNode; - - protected long joinedPathsVector; - - /** - * @param branchingNode - * @param joinedPathsVector - */ - protected UnclosedBranchDescriptor(OptimizerNode branchingNode, long joinedPathsVector) - { - this.branchingNode = branchingNode; - this.joinedPathsVector = joinedPathsVector; - } - - public OptimizerNode getBranchingNode() { - return this.branchingNode; - } - - public long getJoinedPathsVector() { - return this.joinedPathsVector; - } - - @Override - public String toString() { - return "(" + this.branchingNode.getPactContract() + ") [" + this.joinedPathsVector + "]"; - } - } @Override public OptimizerNode getOptimizerNode() { @@ -1145,14 +1120,53 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat StringBuilder bld = new StringBuilder(); bld.append(getName()); - bld.append(" (").append(getPactContract().getName()).append(") "); + bld.append(" (").append(getOperator().getName()).append(") "); int i = 1; - for (PactConnection conn : getIncomingConnections()) { + for (DagConnection conn : getIncomingConnections()) { String shipStrategyName = conn.getShipStrategy() == null ? "null" : conn.getShipStrategy().name(); bld.append('(').append(i++).append(":").append(shipStrategyName).append(')'); } return bld.toString(); } + + // -------------------------------------------------------------------------------------------- + + /** + * Description of an unclosed branch. An unclosed branch is when the data flow branched (one operator's + * result is consumed by multiple targets), but these different branches (targets) have not been joined + * together. + */ + public static final class UnclosedBranchDescriptor { + + protected OptimizerNode branchingNode; + + protected long joinedPathsVector; + + /** + * Creates a new branching descriptor. + * + * @param branchingNode The node where the branch occurred (teh node with multiple outputs). + * @param joinedPathsVector A bit vector describing which branches are tracked by this descriptor. + * The bit vector is one, where the branch is tracked, zero otherwise. + */ + protected UnclosedBranchDescriptor(OptimizerNode branchingNode, long joinedPathsVector) { + this.branchingNode = branchingNode; + this.joinedPathsVector = joinedPathsVector; + } + + public OptimizerNode getBranchingNode() { + return this.branchingNode; + } + + public long getJoinedPathsVector() { + return this.joinedPathsVector; + } + + @Override + public String toString() { + return "(" + this.branchingNode.getOperator() + ") [" + this.joinedPathsVector + "]"; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PactConnection.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PactConnection.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PactConnection.java deleted file mode 100644 index 661ceb5..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PactConnection.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.dag; - -import org.apache.flink.api.common.ExecutionMode; -import org.apache.flink.optimizer.dataproperties.InterestingProperties; -import org.apache.flink.optimizer.plandump.DumpableConnection; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; - -/** - * A connection between to operators. Represents an intermediate result - * and a data exchange between the two operators. - * - * The data exchange has a mode in which it performs (batch / pipelined) - * - * The data exchange strategy may be set on this connection, in which case - * it is fixed and will not be determined during candidate plan enumeration. - * - * During the enumeration of interesting properties, this connection also holds - * all interesting properties generated by the successor operator. - */ -public class PactConnection implements EstimateProvider, DumpableConnection<OptimizerNode> { - - private final OptimizerNode source; // The source node of the connection - - private final OptimizerNode target; // The target node of the connection. - - private final ExecutionMode dataExchangeMode; // defines whether to use batch or pipelined data exchange - - private InterestingProperties interestingProps; // local properties that succeeding nodes are interested in - - private ShipStrategyType shipStrategy; // The data shipping strategy, if predefined. - - private TempMode materializationMode = TempMode.NONE; // the materialization mode - - private int maxDepth = -1; - - private boolean breakPipeline; // whether this connection should break the pipeline due to potential deadlocks - - /** - * Creates a new Connection between two nodes. The shipping strategy is by default <tt>NONE</tt>. - * The temp mode is by default <tt>NONE</tt>. - * - * @param source - * The source node. - * @param target - * The target node. - */ - public PactConnection(OptimizerNode source, OptimizerNode target, ExecutionMode exchangeMode) { - this(source, target, null, exchangeMode); - } - - /** - * Creates a new Connection between two nodes. - * - * @param source - * The source node. - * @param target - * The target node. - * @param shipStrategy - * The shipping strategy. - * @param exchangeMode - * The data exchange mode (pipelined / batch / batch only for shuffles / ... ) - */ - public PactConnection(OptimizerNode source, OptimizerNode target, - ShipStrategyType shipStrategy, ExecutionMode exchangeMode) - { - if (source == null || target == null) { - throw new NullPointerException("Source and target must not be null."); - } - this.source = source; - this.target = target; - this.shipStrategy = shipStrategy; - this.dataExchangeMode = exchangeMode; - } - - /** - * Constructor to create a result from an operator that is not - * consumed by another operator. - * - * @param source - * The source node. - */ - public PactConnection(OptimizerNode source, ExecutionMode exchangeMode) { - if (source == null) { - throw new NullPointerException("Source and target must not be null."); - } - this.source = source; - this.target = null; - this.shipStrategy = ShipStrategyType.NONE; - this.dataExchangeMode = exchangeMode; - } - - /** - * Gets the source of the connection. - * - * @return The source Node. - */ - public OptimizerNode getSource() { - return this.source; - } - - /** - * Gets the target of the connection. - * - * @return The target node. - */ - public OptimizerNode getTarget() { - return this.target; - } - - /** - * Gets the shipping strategy for this connection. - * - * @return The connection's shipping strategy. - */ - public ShipStrategyType getShipStrategy() { - return this.shipStrategy; - } - - /** - * Sets the shipping strategy for this connection. - * - * @param strategy - * The shipping strategy to be applied to this connection. - */ - public void setShipStrategy(ShipStrategyType strategy) { - this.shipStrategy = strategy; - } - - /** - * Gets the data exchange mode to use for this connection. - * - * @return The data exchange mode to use for this connection. - */ - public ExecutionMode getDataExchangeMode() { - if (dataExchangeMode == null) { - throw new IllegalStateException("This connection does not have the data exchange mode set"); - } - return dataExchangeMode; - } - - /** - * Marks that this connection should do a decoupled data exchange (such as batched) - * rather then pipeline data. Connections are marked as pipeline breakers to avoid - * deadlock situations. - */ - public void markBreaksPipeline() { - this.breakPipeline = true; - } - - /** - * Checks whether this connection is marked to break the pipeline. - * - * @return True, if this connection is marked to break the pipeline, false otherwise. - */ - public boolean isBreakingPipeline() { - return this.breakPipeline; - } - - /** - * Gets the interesting properties object for this pact connection. - * If the interesting properties for this connections have not yet been set, - * this method returns null. - * - * @return The collection of all interesting properties, or null, if they have not yet been set. - */ - public InterestingProperties getInterestingProperties() { - return this.interestingProps; - } - - /** - * Sets the interesting properties for this pact connection. - * - * @param props The interesting properties. - */ - public void setInterestingProperties(InterestingProperties props) { - if (this.interestingProps == null) { - this.interestingProps = props; - } else { - throw new IllegalStateException("Interesting Properties have already been set."); - } - } - - public void clearInterestingProperties() { - this.interestingProps = null; - } - - public void initMaxDepth() { - - if (this.maxDepth == -1) { - this.maxDepth = this.source.getMaxDepth() + 1; - } else { - throw new IllegalStateException("Maximum path depth has already been initialized."); - } - } - - public int getMaxDepth() { - if (this.maxDepth != -1) { - return this.maxDepth; - } else { - throw new IllegalStateException("Maximum path depth has not been initialized."); - } - } - - // -------------------------------------------------------------------------------------------- - // Estimates - // -------------------------------------------------------------------------------------------- - - @Override - public long getEstimatedOutputSize() { - return this.source.getEstimatedOutputSize(); - } - - @Override - public long getEstimatedNumRecords() { - return this.source.getEstimatedNumRecords(); - } - - @Override - public float getEstimatedAvgWidthPerOutputRecord() { - return this.source.getEstimatedAvgWidthPerOutputRecord(); - } - - // -------------------------------------------------------------------------------------------- - - - public TempMode getMaterializationMode() { - return this.materializationMode; - } - - public void setMaterializationMode(TempMode materializationMode) { - this.materializationMode = materializationMode; - } - - public boolean isOnDynamicPath() { - return this.source.isOnDynamicPath(); - } - - public int getCostWeight() { - return this.source.getCostWeight(); - } - - // -------------------------------------------------------------------------------------------- - - public String toString() { - StringBuilder buf = new StringBuilder(50); - buf.append("Connection: "); - - if (this.source == null) { - buf.append("null"); - } else { - buf.append(this.source.getPactContract().getName()); - buf.append('(').append(this.source.getName()).append(')'); - } - - buf.append(" -> "); - - if (this.shipStrategy != null) { - buf.append('['); - buf.append(this.shipStrategy.name()); - buf.append(']').append(' '); - } - - if (this.target == null) { - buf.append("null"); - } else { - buf.append(this.target.getPactContract().getName()); - buf.append('(').append(this.target.getName()).append(')'); - } - - return buf.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java index 00b54ac..5c811b0 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java @@ -49,13 +49,13 @@ public class PartitionNode extends SingleInputNode { super(operator); OperatorDescriptorSingle descr = new PartitionDescriptor( - this.getPactContract().getPartitionMethod(), this.keys, operator.getCustomPartitioner()); + this.getOperator().getPartitionMethod(), this.keys, operator.getCustomPartitioner()); this.possibleProperties = Collections.singletonList(descr); } @Override - public PartitionOperatorBase<?> getPactContract() { - return (PartitionOperatorBase<?>) super.getPactContract(); + public PartitionOperatorBase<?> getOperator() { + return (PartitionOperatorBase<?>) super.getOperator(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java index 6f4c43d..1477038 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java @@ -62,8 +62,8 @@ public class ReduceNode extends SingleInputNode { // ------------------------------------------------------------------------ @Override - public ReduceOperatorBase<?, ?> getPactContract() { - return (ReduceOperatorBase<?, ?>) super.getPactContract(); + public ReduceOperatorBase<?, ?> getOperator() { + return (ReduceOperatorBase<?, ?>) super.getOperator(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java index 9217beb..cc12bb8 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java @@ -36,7 +36,7 @@ import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.SingleInputOperator; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.costs.CostEstimator; import org.apache.flink.optimizer.dataproperties.GlobalProperties; import org.apache.flink.optimizer.dataproperties.InterestingProperties; @@ -68,7 +68,7 @@ public abstract class SingleInputNode extends OptimizerNode { protected final FieldSet keys; // The set of key fields - protected PactConnection inConn; // the input of the node + protected DagConnection inConn; // the input of the node // -------------------------------------------------------------------------------------------- @@ -103,8 +103,8 @@ public abstract class SingleInputNode extends OptimizerNode { // -------------------------------------------------------------------------------------------- @Override - public SingleInputOperator<?, ?, ?> getPactContract() { - return (SingleInputOperator<?, ?, ?>) super.getPactContract(); + public SingleInputOperator<?, ?, ?> getOperator() { + return (SingleInputOperator<?, ?, ?>) super.getOperator(); } /** @@ -112,7 +112,7 @@ public abstract class SingleInputNode extends OptimizerNode { * * @return The input. */ - public PactConnection getIncomingConnection() { + public DagConnection getIncomingConnection() { return this.inConn; } @@ -121,7 +121,7 @@ public abstract class SingleInputNode extends OptimizerNode { * * @param inConn The input connection to set. */ - public void setIncomingConnection(PactConnection inConn) { + public void setIncomingConnection(DagConnection inConn) { this.inConn = inConn; } @@ -139,14 +139,14 @@ public abstract class SingleInputNode extends OptimizerNode { } @Override - public List<PactConnection> getIncomingConnections() { + public List<DagConnection> getIncomingConnections() { return Collections.singletonList(this.inConn); } @Override public SemanticProperties getSemanticProperties() { - return getPactContract().getSemanticProperties(); + return getOperator().getSemanticProperties(); } @@ -155,18 +155,18 @@ public abstract class SingleInputNode extends OptimizerNode { throws CompilerException { // see if an internal hint dictates the strategy to use - final Configuration conf = getPactContract().getParameters(); - final String shipStrategy = conf.getString(PactCompiler.HINT_SHIP_STRATEGY, null); + final Configuration conf = getOperator().getParameters(); + final String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null); final ShipStrategyType preSet; if (shipStrategy != null) { - if (shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH)) { + if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH)) { preSet = ShipStrategyType.PARTITION_HASH; - } else if (shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE)) { + } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE)) { preSet = ShipStrategyType.PARTITION_RANGE; - } else if (shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_FORWARD)) { + } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_FORWARD)) { preSet = ShipStrategyType.FORWARD; - } else if (shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION)) { + } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) { preSet = ShipStrategyType.PARTITION_RANDOM; } else { throw new CompilerException("Unrecognized ship strategy hint: " + shipStrategy); @@ -176,15 +176,15 @@ public abstract class SingleInputNode extends OptimizerNode { } // get the predecessor node - Operator<?> children = ((SingleInputOperator<?, ?, ?>) getPactContract()).getInput(); + Operator<?> children = ((SingleInputOperator<?, ?, ?>) getOperator()).getInput(); OptimizerNode pred; - PactConnection conn; + DagConnection conn; if (children == null) { - throw new CompilerException("Error: Node for '" + getPactContract().getName() + "' has no input."); + throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input."); } else { pred = contractToNode.get(children); - conn = new PactConnection(pred, this, defaultExchangeMode); + conn = new DagConnection(pred, this, defaultExchangeMode); if (preSet != null) { conn.setShipStrategy(preSet); } @@ -230,7 +230,7 @@ public abstract class SingleInputNode extends OptimizerNode { } this.inConn.setInterestingProperties(props); - for (PactConnection conn : getBroadcastConnections()) { + for (DagConnection conn : getBroadcastConnections()) { conn.setInterestingProperties(new InterestingProperties()); } } @@ -251,11 +251,11 @@ public abstract class SingleInputNode extends OptimizerNode { // calculate alternative sub-plans for broadcast inputs final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>(); - List<PactConnection> broadcastConnections = getBroadcastConnections(); + List<DagConnection> broadcastConnections = getBroadcastConnections(); List<String> broadcastConnectionNames = getBroadcastConnectionNames(); for (int i = 0; i < broadcastConnections.size(); i++ ) { - PactConnection broadcastConnection = broadcastConnections.get(i); + DagConnection broadcastConnection = broadcastConnections.get(i); String broadcastConnectionName = broadcastConnectionNames.get(i); List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator); @@ -283,8 +283,8 @@ public abstract class SingleInputNode extends OptimizerNode { final ExecutionMode executionMode = this.inConn.getDataExchangeMode(); - final int dop = getDegreeOfParallelism(); - final int inDop = getPredecessorNode().getDegreeOfParallelism(); + final int dop = getParallelism(); + final int inDop = getPredecessorNode().getParallelism(); final boolean dopChange = inDop != dop; final boolean breaksPipeline = this.inConn.isBreakingPipeline(); @@ -509,7 +509,7 @@ public abstract class SingleInputNode extends OptimizerNode { } else { throw new CompilerException(); } - for (PactConnection connection : getBroadcastConnections()) { + for (DagConnection connection : getBroadcastConnections()) { connection.getSource().accept(visitor); } visitor.postVisit(this); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java index 2d65b4d..40725ba 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java @@ -31,8 +31,9 @@ import org.apache.flink.optimizer.util.NoOpBinaryUdfOp; import org.apache.flink.types.Nothing; /** - * This class represents a utility node that is not part of the actual plan. It is used for plans with multiple data sinks to - * transform it into a plan with a single root node. That way, the code that makes sure no costs are double-counted and that + * This class represents a utility node that is not part of the actual plan. + * It is used for plans with multiple data sinks to transform it into a plan with + * a single root node. That way, the code that makes sure no costs are double-counted and that * candidate selection works correctly with nodes that have multiple outputs is transparently reused. */ public class SinkJoiner extends TwoInputNode { @@ -40,8 +41,8 @@ public class SinkJoiner extends TwoInputNode { public SinkJoiner(OptimizerNode input1, OptimizerNode input2) { super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo())); - PactConnection conn1 = new PactConnection(input1, this, null, ExecutionMode.PIPELINED); - PactConnection conn2 = new PactConnection(input2, this, null, ExecutionMode.PIPELINED); + DagConnection conn1 = new DagConnection(input1, this, null, ExecutionMode.PIPELINED); + DagConnection conn2 = new DagConnection(input2, this, null, ExecutionMode.PIPELINED); this.input1 = conn1; this.input2 = conn2; @@ -55,7 +56,7 @@ public class SinkJoiner extends TwoInputNode { } @Override - public List<PactConnection> getOutgoingConnections() { + public List<DagConnection> getOutgoingConnections() { return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java index df47e56..1292cf5 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java @@ -46,7 +46,7 @@ public class SolutionSetNode extends AbstractPartialSolutionNode { // -------------------------------------------------------------------------------------------- public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) { - this.cachedPlans = Collections.<PlanNode>singletonList(new SolutionSetPlanNode(this, "SolutionSet("+this.getPactContract().getName()+")", gProps, lProps, initialInput)); + this.cachedPlans = Collections.<PlanNode>singletonList(new SolutionSetPlanNode(this, "SolutionSet("+this.getOperator().getName()+")", gProps, lProps, initialInput)); } public SolutionSetPlanNode getCurrentSolutionSetPlanNode() { @@ -74,8 +74,8 @@ public class SolutionSetNode extends AbstractPartialSolutionNode { * @return The contract. */ @Override - public SolutionSetPlaceHolder<?> getPactContract() { - return (SolutionSetPlaceHolder<?>) super.getPactContract(); + public SolutionSetPlaceHolder<?> getOperator() { + return (SolutionSetPlaceHolder<?>) super.getOperator(); } @Override @@ -89,7 +89,7 @@ public class SolutionSetNode extends AbstractPartialSolutionNode { return; } - PactConnection solutionSetInput = this.iterationNode.getFirstIncomingConnection(); + DagConnection solutionSetInput = this.iterationNode.getFirstIncomingConnection(); OptimizerNode solutionSetSource = solutionSetInput.getSource(); addClosedBranches(solutionSetSource.closedBranchingNodes); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java index 17bc8f1..83bc39a 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java @@ -51,8 +51,8 @@ public class SortPartitionNode extends SingleInputNode { } @Override - public SortPartitionOperatorBase<?> getPactContract() { - return (SortPartitionOperatorBase<?>) super.getPactContract(); + public SortPartitionOperatorBase<?> getOperator() { + return (SortPartitionOperatorBase<?>) super.getOperator(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java index d51f2de..39da165 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java @@ -36,7 +36,7 @@ import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.costs.CostEstimator; import org.apache.flink.optimizer.dataproperties.GlobalProperties; import org.apache.flink.optimizer.dataproperties.InterestingProperties; @@ -70,9 +70,9 @@ public abstract class TwoInputNode extends OptimizerNode { protected final FieldList keys2; // The set of key fields for the second input - protected PactConnection input1; // The first input edge + protected DagConnection input1; // The first input edge - protected PactConnection input2; // The second input edge + protected DagConnection input2; // The second input edge private List<OperatorDescriptorDual> cachedDescriptors; @@ -109,8 +109,8 @@ public abstract class TwoInputNode extends OptimizerNode { // ------------------------------------------------------------------------ @Override - public DualInputOperator<?, ?, ?, ?> getPactContract() { - return (DualInputOperator<?, ?, ?, ?>) super.getPactContract(); + public DualInputOperator<?, ?, ?, ?> getOperator() { + return (DualInputOperator<?, ?, ?, ?>) super.getOperator(); } /** @@ -118,7 +118,7 @@ public abstract class TwoInputNode extends OptimizerNode { * * @return The first input connection. */ - public PactConnection getFirstIncomingConnection() { + public DagConnection getFirstIncomingConnection() { return this.input1; } @@ -127,7 +127,7 @@ public abstract class TwoInputNode extends OptimizerNode { * * @return The second input connection. */ - public PactConnection getSecondIncomingConnection() { + public DagConnection getSecondIncomingConnection() { return this.input2; } @@ -148,8 +148,8 @@ public abstract class TwoInputNode extends OptimizerNode { } @Override - public List<PactConnection> getIncomingConnections() { - ArrayList<PactConnection> inputs = new ArrayList<PactConnection>(2); + public List<DagConnection> getIncomingConnections() { + ArrayList<DagConnection> inputs = new ArrayList<DagConnection>(2); inputs.add(input1); inputs.add(input2); return inputs; @@ -159,21 +159,21 @@ public abstract class TwoInputNode extends OptimizerNode { @Override public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExecutionMode) { // see if there is a hint that dictates which shipping strategy to use for BOTH inputs - final Configuration conf = getPactContract().getParameters(); + final Configuration conf = getOperator().getParameters(); ShipStrategyType preSet1 = null; ShipStrategyType preSet2 = null; - String shipStrategy = conf.getString(PactCompiler.HINT_SHIP_STRATEGY, null); + String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null); if (shipStrategy != null) { - if (PactCompiler.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) { + if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) { preSet1 = preSet2 = ShipStrategyType.FORWARD; - } else if (PactCompiler.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) { + } else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) { preSet1 = preSet2 = ShipStrategyType.BROADCAST; - } else if (PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) { + } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) { preSet1 = preSet2 = ShipStrategyType.PARTITION_HASH; - } else if (PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) { + } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) { preSet1 = preSet2 = ShipStrategyType.PARTITION_RANGE; - } else if (shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION)) { + } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) { preSet1 = preSet2 = ShipStrategyType.PARTITION_RANDOM; } else { throw new CompilerException("Unknown hint for shipping strategy: " + shipStrategy); @@ -181,17 +181,17 @@ public abstract class TwoInputNode extends OptimizerNode { } // see if there is a hint that dictates which shipping strategy to use for the FIRST input - shipStrategy = conf.getString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, null); + shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, null); if (shipStrategy != null) { - if (PactCompiler.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) { + if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) { preSet1 = ShipStrategyType.FORWARD; - } else if (PactCompiler.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) { + } else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) { preSet1 = ShipStrategyType.BROADCAST; - } else if (PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) { + } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) { preSet1 = ShipStrategyType.PARTITION_HASH; - } else if (PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) { + } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) { preSet1 = ShipStrategyType.PARTITION_RANGE; - } else if (shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION)) { + } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) { preSet1 = ShipStrategyType.PARTITION_RANDOM; } else { throw new CompilerException("Unknown hint for shipping strategy of input one: " + shipStrategy); @@ -199,17 +199,17 @@ public abstract class TwoInputNode extends OptimizerNode { } // see if there is a hint that dictates which shipping strategy to use for the SECOND input - shipStrategy = conf.getString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, null); + shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, null); if (shipStrategy != null) { - if (PactCompiler.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) { + if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) { preSet2 = ShipStrategyType.FORWARD; - } else if (PactCompiler.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) { + } else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) { preSet2 = ShipStrategyType.BROADCAST; - } else if (PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) { + } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) { preSet2 = ShipStrategyType.PARTITION_HASH; - } else if (PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) { + } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) { preSet2 = ShipStrategyType.PARTITION_RANGE; - } else if (shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION)) { + } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) { preSet2 = ShipStrategyType.PARTITION_RANDOM; } else { throw new CompilerException("Unknown hint for shipping strategy of input two: " + shipStrategy); @@ -217,18 +217,18 @@ public abstract class TwoInputNode extends OptimizerNode { } // get the predecessors - DualInputOperator<?, ?, ?, ?> contr = getPactContract(); + DualInputOperator<?, ?, ?, ?> contr = getOperator(); Operator<?> leftPred = contr.getFirstInput(); Operator<?> rightPred = contr.getSecondInput(); OptimizerNode pred1; - PactConnection conn1; + DagConnection conn1; if (leftPred == null) { - throw new CompilerException("Error: Node for '" + getPactContract().getName() + "' has no input set for first input."); + throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input set for first input."); } else { pred1 = contractToNode.get(leftPred); - conn1 = new PactConnection(pred1, this, defaultExecutionMode); + conn1 = new DagConnection(pred1, this, defaultExecutionMode); if (preSet1 != null) { conn1.setShipStrategy(preSet1); } @@ -239,12 +239,12 @@ public abstract class TwoInputNode extends OptimizerNode { pred1.addOutgoingConnection(conn1); OptimizerNode pred2; - PactConnection conn2; + DagConnection conn2; if (rightPred == null) { - throw new CompilerException("Error: Node for '" + getPactContract().getName() + "' has no input set for second input."); + throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input set for second input."); } else { pred2 = contractToNode.get(rightPred); - conn2 = new PactConnection(pred2, this, defaultExecutionMode); + conn2 = new DagConnection(pred2, this, defaultExecutionMode); if (preSet2 != null) { conn2.setShipStrategy(preSet2); } @@ -290,7 +290,7 @@ public abstract class TwoInputNode extends OptimizerNode { this.input1.setInterestingProperties(props1); this.input2.setInterestingProperties(props2); - for (PactConnection conn : getBroadcastConnections()) { + for (DagConnection conn : getBroadcastConnections()) { conn.setInterestingProperties(new InterestingProperties()); } } @@ -314,11 +314,11 @@ public abstract class TwoInputNode extends OptimizerNode { // calculate alternative sub-plans for broadcast inputs final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>(); - List<PactConnection> broadcastConnections = getBroadcastConnections(); + List<DagConnection> broadcastConnections = getBroadcastConnections(); List<String> broadcastConnectionNames = getBroadcastConnectionNames(); for (int i = 0; i < broadcastConnections.size(); i++ ) { - PactConnection broadcastConnection = broadcastConnections.get(i); + DagConnection broadcastConnection = broadcastConnections.get(i); String broadcastConnectionName = broadcastConnectionNames.get(i); List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator); @@ -352,9 +352,9 @@ public abstract class TwoInputNode extends OptimizerNode { final ExecutionMode input1Mode = this.input1.getDataExchangeMode(); final ExecutionMode input2Mode = this.input2.getDataExchangeMode(); - final int dop = getDegreeOfParallelism(); - final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism(); - final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism(); + final int dop = getParallelism(); + final int inDop1 = getFirstPredecessorNode().getParallelism(); + final int inDop2 = getSecondPredecessorNode().getParallelism(); final boolean dopChange1 = dop != inDop1; final boolean dopChange2 = dop != inDop2; @@ -720,7 +720,7 @@ public abstract class TwoInputNode extends OptimizerNode { @Override public SemanticProperties getSemanticProperties() { - return getPactContract().getSemanticProperties(); + return getOperator().getSemanticProperties(); } // -------------------------------------------------------------------------------------------- @@ -737,7 +737,7 @@ public abstract class TwoInputNode extends OptimizerNode { getFirstPredecessorNode().accept(visitor); getSecondPredecessorNode().accept(visitor); - for (PactConnection connection : getBroadcastConnections()) { + for (DagConnection connection : getBroadcastConnections()) { connection.getSource().accept(visitor); } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java index 2ec36b1..71a49f3 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java @@ -32,7 +32,7 @@ import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.typeinfo.NothingTypeInfo; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.PactCompiler.InterestingPropertyVisitor; +import org.apache.flink.optimizer.Optimizer.InterestingPropertyVisitor; import org.apache.flink.optimizer.costs.CostEstimator; import org.apache.flink.optimizer.dataproperties.GlobalProperties; import org.apache.flink.optimizer.dataproperties.InterestingProperties; @@ -80,9 +80,9 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode private OptimizerNode nextWorkset; - private PactConnection solutionSetDeltaRootConnection; + private DagConnection solutionSetDeltaRootConnection; - private PactConnection nextWorksetRootConnection; + private DagConnection nextWorksetRootConnection; private SingleRootJoiner singleRoot; @@ -122,7 +122,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode // -------------------------------------------------------------------------------------------- public DeltaIterationBase<?, ?> getIterationContract() { - return (DeltaIterationBase<?, ?>) getPactContract(); + return (DeltaIterationBase<?, ?>) getOperator(); } public SolutionSetNode getSolutionSetNode() { @@ -167,9 +167,9 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode // if the next workset is equal to the workset, we need to inject a no-op node if (nextWorkset == worksetNode || nextWorkset instanceof BinaryUnionNode) { NoOpNode noop = new NoOpNode(); - noop.setDegreeOfParallelism(getDegreeOfParallelism()); + noop.setDegreeOfParallelism(getParallelism()); - PactConnection noOpConn = new PactConnection(nextWorkset, noop, executionMode); + DagConnection noOpConn = new DagConnection(nextWorkset, noop, executionMode); noop.setIncomingConnection(noOpConn); nextWorkset.addOutgoingConnection(noOpConn); @@ -179,9 +179,9 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode // attach an extra node to the solution set delta for the cases where we need to repartition UnaryOperatorNode solutionSetDeltaUpdateAux = new UnaryOperatorNode("Solution-Set Delta", getSolutionSetKeyFields(), new SolutionSetDeltaOperator(getSolutionSetKeyFields())); - solutionSetDeltaUpdateAux.setDegreeOfParallelism(getDegreeOfParallelism()); + solutionSetDeltaUpdateAux.setDegreeOfParallelism(getParallelism()); - PactConnection conn = new PactConnection(solutionSetDelta, solutionSetDeltaUpdateAux, executionMode); + DagConnection conn = new DagConnection(solutionSetDelta, solutionSetDeltaUpdateAux, executionMode); solutionSetDeltaUpdateAux.setIncomingConnection(conn); solutionSetDelta.addOutgoingConnection(conn); @@ -189,10 +189,10 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode this.nextWorkset = nextWorkset; this.singleRoot = new SingleRootJoiner(); - this.solutionSetDeltaRootConnection = new PactConnection(solutionSetDeltaUpdateAux, + this.solutionSetDeltaRootConnection = new DagConnection(solutionSetDeltaUpdateAux, this.singleRoot, executionMode); - this.nextWorksetRootConnection = new PactConnection(nextWorkset, this.singleRoot, executionMode); + this.nextWorksetRootConnection = new DagConnection(nextWorkset, this.singleRoot, executionMode); this.singleRoot.setInputs(this.solutionSetDeltaRootConnection, this.nextWorksetRootConnection); solutionSetDeltaUpdateAux.addOutgoingConnection(this.solutionSetDeltaRootConnection); @@ -371,7 +371,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties", FieldList.EMPTY_LIST); - rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism()); + rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getParallelism()); SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode( rebuildWorksetPropertiesNode, "Rebuild Workset Properties", @@ -454,7 +454,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode } WorksetIterationPlanNode wsNode = new WorksetIterationPlanNode(this, - "WorksetIteration ("+this.getPactContract().getName()+")", solutionSetIn, + "WorksetIteration ("+this.getOperator().getName()+")", solutionSetIn, worksetIn, sspn, wspn, worksetCandidate, solutionSetCandidate); wsNode.setImmediateSolutionSetUpdate(immediateDeltaUpdate); wsNode.initProperties(gp, lp); @@ -566,7 +566,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode setDegreeOfParallelism(1); } - public void setInputs(PactConnection input1, PactConnection input2) { + public void setInputs(DagConnection input1, DagConnection input2) { this.input1 = input1; this.input2 = input2; } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java index bd39858..3b05aba 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java @@ -49,7 +49,7 @@ public class WorksetNode extends AbstractPartialSolutionNode { if (this.cachedPlans != null) { throw new IllegalStateException(); } else { - WorksetPlanNode wspn = new WorksetPlanNode(this, "Workset ("+this.getPactContract().getName()+")", gProps, lProps, initialInput); + WorksetPlanNode wspn = new WorksetPlanNode(this, "Workset ("+this.getOperator().getName()+")", gProps, lProps, initialInput); this.cachedPlans = Collections.<PlanNode>singletonList(wspn); } } @@ -79,8 +79,8 @@ public class WorksetNode extends AbstractPartialSolutionNode { * @return The contract. */ @Override - public WorksetPlaceHolder<?> getPactContract() { - return (WorksetPlaceHolder<?>) super.getPactContract(); + public WorksetPlaceHolder<?> getOperator() { + return (WorksetPlaceHolder<?>) super.getOperator(); } @Override @@ -94,7 +94,7 @@ public class WorksetNode extends AbstractPartialSolutionNode { return; } - PactConnection worksetInput = this.iterationNode.getSecondIncomingConnection(); + DagConnection worksetInput = this.iterationNode.getSecondIncomingConnection(); OptimizerNode worksetSource = worksetInput.getSource(); addClosedBranches(worksetSource.closedBranchingNodes); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java index a547f04..57ba29d 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java @@ -185,11 +185,7 @@ public class GlobalProperties implements Cloneable { } public boolean isExactlyPartitionedOnFields(FieldList fields) { - if (this.partitioning.isPartitionedOnKey() && fields.isExactMatch(this.partitioningFields)) { - return true; - } else { - return false; - } + return this.partitioning.isPartitionedOnKey() && fields.isExactMatch(this.partitioningFields); } public boolean matchesOrderedPartitioning(Ordering o) {