http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java deleted file mode 100644 index d77fe1e..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java +++ /dev/null @@ -1,1372 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer; - -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase; -import org.apache.flink.optimizer.dag.GroupCombineNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flink.api.common.ExecutionMode; -import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase; -import org.apache.flink.optimizer.dag.SortPartitionNode; -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.operators.GenericDataSinkBase; -import org.apache.flink.api.common.operators.GenericDataSourceBase; -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.common.operators.Union; -import org.apache.flink.api.common.operators.base.BulkIterationBase; -import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; -import org.apache.flink.api.common.operators.base.CrossOperatorBase; -import org.apache.flink.api.common.operators.base.DeltaIterationBase; -import org.apache.flink.api.common.operators.base.FilterOperatorBase; -import org.apache.flink.api.common.operators.base.FlatMapOperatorBase; -import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; -import org.apache.flink.api.common.operators.base.JoinOperatorBase; -import org.apache.flink.api.common.operators.base.MapOperatorBase; -import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase; -import org.apache.flink.api.common.operators.base.PartitionOperatorBase; -import org.apache.flink.api.common.operators.base.ReduceOperatorBase; -import org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder; -import org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder; -import org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder; -import org.apache.flink.optimizer.costs.CostEstimator; -import org.apache.flink.optimizer.costs.DefaultCostEstimator; -import org.apache.flink.optimizer.dag.BinaryUnionNode; -import org.apache.flink.optimizer.dag.BulkIterationNode; -import org.apache.flink.optimizer.dag.BulkPartialSolutionNode; -import org.apache.flink.optimizer.dag.CoGroupNode; -import org.apache.flink.optimizer.dag.CollectorMapNode; -import org.apache.flink.optimizer.dag.CrossNode; -import org.apache.flink.optimizer.dag.DataSinkNode; -import org.apache.flink.optimizer.dag.DataSourceNode; -import org.apache.flink.optimizer.dag.FilterNode; -import org.apache.flink.optimizer.dag.FlatMapNode; -import org.apache.flink.optimizer.dag.GroupReduceNode; -import org.apache.flink.optimizer.dag.IterationNode; -import org.apache.flink.optimizer.dag.MapNode; -import org.apache.flink.optimizer.dag.MapPartitionNode; -import org.apache.flink.optimizer.dag.JoinNode; -import org.apache.flink.optimizer.dag.OptimizerNode; -import org.apache.flink.optimizer.dag.PactConnection; -import org.apache.flink.optimizer.dag.PartitionNode; -import org.apache.flink.optimizer.dag.ReduceNode; -import org.apache.flink.optimizer.dag.SinkJoiner; -import org.apache.flink.optimizer.dag.SolutionSetNode; -import org.apache.flink.optimizer.dag.TempMode; -import org.apache.flink.optimizer.dag.WorksetIterationNode; -import org.apache.flink.optimizer.dag.WorksetNode; -import org.apache.flink.optimizer.deadlockdetect.DeadlockPreventer; -import org.apache.flink.optimizer.plan.BinaryUnionPlanNode; -import org.apache.flink.optimizer.plan.BulkIterationPlanNode; -import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.IterationPlanNode; -import org.apache.flink.optimizer.plan.NAryUnionPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.PlanNode; -import org.apache.flink.optimizer.plan.SinkJoinerPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plan.SolutionSetPlanNode; -import org.apache.flink.optimizer.plan.SourcePlanNode; -import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; -import org.apache.flink.optimizer.plan.WorksetPlanNode; -import org.apache.flink.optimizer.postpass.OptimizerPostPass; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.util.LocalStrategy; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Visitor; - -/** - * The optimizer that takes the user specified program plan and creates an optimized plan that contains - * exact descriptions about how the physical execution will take place. It first translates the user - * program into an internal optimizer representation and then chooses between different alternatives - * for shipping strategies and local strategies. - * <p> - * The basic principle is taken from optimizer works in systems such as Volcano/Cascades and Selinger/System-R/DB2. The - * optimizer walks from the sinks down, generating interesting properties, and ascends from the sources generating - * alternative plans, pruning against the interesting properties. - * <p> - * The optimizer also assigns the memory to the individual tasks. This is currently done in a very simple fashion: All - * sub-tasks that need memory (e.g. reduce or join) are given an equal share of memory. - */ -public class PactCompiler { - - // ------------------------------------------------------------------------ - // Constants - // ------------------------------------------------------------------------ - - /** - * Compiler hint key for the input channel's shipping strategy. This String is a key to the operator's stub - * parameters. The corresponding value tells the compiler which shipping strategy to use for the input channel. - * If the operator has two input channels, the shipping strategy is applied to both input channels. - */ - public static final String HINT_SHIP_STRATEGY = "INPUT_SHIP_STRATEGY"; - - /** - * Compiler hint key for the <b>first</b> input channel's shipping strategy. This String is a key to - * the operator's stub parameters. The corresponding value tells the compiler which shipping strategy - * to use for the <b>first</b> input channel. Only applicable to operators with two inputs. - */ - public static final String HINT_SHIP_STRATEGY_FIRST_INPUT = "INPUT_LEFT_SHIP_STRATEGY"; - - /** - * Compiler hint key for the <b>second</b> input channel's shipping strategy. This String is a key to - * the operator's stub parameters. The corresponding value tells the compiler which shipping strategy - * to use for the <b>second</b> input channel. Only applicable to operators with two inputs. - */ - public static final String HINT_SHIP_STRATEGY_SECOND_INPUT = "INPUT_RIGHT_SHIP_STRATEGY"; - - /** - * Value for the shipping strategy compiler hint that enforces a <b>Forward</b> strategy on the - * input channel, i.e. no redistribution of any kind. - * - * @see #HINT_SHIP_STRATEGY - * @see #HINT_SHIP_STRATEGY_FIRST_INPUT - * @see #HINT_SHIP_STRATEGY_SECOND_INPUT - */ - public static final String HINT_SHIP_STRATEGY_FORWARD = "SHIP_FORWARD"; - - /** - * Value for the shipping strategy compiler hint that enforces a random repartition strategy. - * - * @see #HINT_SHIP_STRATEGY - * @see #HINT_SHIP_STRATEGY_FIRST_INPUT - * @see #HINT_SHIP_STRATEGY_SECOND_INPUT - */ - public static final String HINT_SHIP_STRATEGY_REPARTITION= "SHIP_REPARTITION"; - - /** - * Value for the shipping strategy compiler hint that enforces a hash-partition strategy. - * - * @see #HINT_SHIP_STRATEGY - * @see #HINT_SHIP_STRATEGY_FIRST_INPUT - * @see #HINT_SHIP_STRATEGY_SECOND_INPUT - */ - public static final String HINT_SHIP_STRATEGY_REPARTITION_HASH = "SHIP_REPARTITION_HASH"; - - /** - * Value for the shipping strategy compiler hint that enforces a range-partition strategy. - * - * @see #HINT_SHIP_STRATEGY - * @see #HINT_SHIP_STRATEGY_FIRST_INPUT - * @see #HINT_SHIP_STRATEGY_SECOND_INPUT - */ - public static final String HINT_SHIP_STRATEGY_REPARTITION_RANGE = "SHIP_REPARTITION_RANGE"; - - /** - * Value for the shipping strategy compiler hint that enforces a <b>broadcast</b> strategy on the - * input channel. - * - * @see #HINT_SHIP_STRATEGY - * @see #HINT_SHIP_STRATEGY_FIRST_INPUT - * @see #HINT_SHIP_STRATEGY_SECOND_INPUT - */ - public static final String HINT_SHIP_STRATEGY_BROADCAST = "SHIP_BROADCAST"; - - /** - * Compiler hint key for the operator's local strategy. This String is a key to the operator's stub - * parameters. The corresponding value tells the compiler which local strategy to use to process the - * data inside one partition. - * <p> - * This hint is ignored by operators that do not have a local strategy (such as <i>Map</i>), or by operators that - * have no choice in their local strategy (such as <i>Cross</i>). - */ - public static final String HINT_LOCAL_STRATEGY = "LOCAL_STRATEGY"; - - /** - * Value for the local strategy compiler hint that enforces a <b>sort based</b> local strategy. - * For example, a <i>Reduce</i> operator will sort the data to group it. - * - * @see #HINT_LOCAL_STRATEGY - */ - public static final String HINT_LOCAL_STRATEGY_SORT = "LOCAL_STRATEGY_SORT"; - - /** - * Value for the local strategy compiler hint that enforces a <b>sort based</b> local strategy. - * During sorting a combine method is repeatedly applied to reduce the data volume. - * For example, a <i>Reduce</i> operator will sort the data to group it. - * - * @see #HINT_LOCAL_STRATEGY - */ - public static final String HINT_LOCAL_STRATEGY_COMBINING_SORT = "LOCAL_STRATEGY_COMBINING_SORT"; - - /** - * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy on both - * inputs with subsequent merging of inputs. - * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs - * of matching keys. - * - * @see #HINT_LOCAL_STRATEGY - */ - public static final String HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE = "LOCAL_STRATEGY_SORT_BOTH_MERGE"; - - /** - * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy. - * The first input is sorted, the second input is assumed to be sorted. After sorting both inputs are merged. - * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs - * of matching keys. - * - * @see #HINT_LOCAL_STRATEGY - */ - public static final String HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE = "LOCAL_STRATEGY_SORT_FIRST_MERGE"; - - /** - * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy. - * The second input is sorted, the first input is assumed to be sorted. After sorting both inputs are merged. - * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs - * of matching keys. - * - * @see #HINT_LOCAL_STRATEGY - */ - public static final String HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE = "LOCAL_STRATEGY_SORT_SECOND_MERGE"; - - /** - * Value for the local strategy compiler hint that enforces a <b>merge based</b> local strategy. - * Both inputs are assumed to be sorted and are merged. - * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a merge strategy to find pairs - * of matching keys. - * - * @see #HINT_LOCAL_STRATEGY - */ - public static final String HINT_LOCAL_STRATEGY_MERGE = "LOCAL_STRATEGY_MERGE"; - - - /** - * Value for the local strategy compiler hint that enforces a <b>hash based</b> local strategy. - * For example, a <i>Match</i> operator will use a hybrid-hash-join strategy to find pairs of - * matching keys. The <b>first</b> input will be used to build the hash table, the second input will be - * used to probe the table. - * - * @see #HINT_LOCAL_STRATEGY - */ - public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST = "LOCAL_STRATEGY_HASH_BUILD_FIRST"; - - /** - * Value for the local strategy compiler hint that enforces a <b>hash based</b> local strategy. - * For example, a <i>Match</i> operator will use a hybrid-hash-join strategy to find pairs of - * matching keys. The <b>second</b> input will be used to build the hash table, the first input will be - * used to probe the table. - * - * @see #HINT_LOCAL_STRATEGY - */ - public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND = "LOCAL_STRATEGY_HASH_BUILD_SECOND"; - - /** - * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy. - * A <i>Cross</i> operator will process the data of the <b>first</b> input in the outer-loop of the nested loops. - * Hence, the data of the first input will be is streamed though, while the data of the second input is stored on - * disk - * and repeatedly read. - * - * @see #HINT_LOCAL_STRATEGY - */ - public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST"; - - /** - * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy. - * A <i>Cross</i> operator will process the data of the <b>second</b> input in the outer-loop of the nested loops. - * Hence, the data of the second input will be is streamed though, while the data of the first input is stored on - * disk - * and repeatedly read. - * - * @see #HINT_LOCAL_STRATEGY - */ - public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND"; - - /** - * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy. - * A <i>Cross</i> operator will process the data of the <b>first</b> input in the outer-loop of the nested loops. - * Further more, the first input, being the outer side, will be processed in blocks, and for each block, the second - * input, - * being the inner side, will read repeatedly from disk. - * - * @see #HINT_LOCAL_STRATEGY - */ - public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST"; - - /** - * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy. - * A <i>Cross</i> operator will process the data of the <b>second</b> input in the outer-loop of the nested loops. - * Further more, the second input, being the outer side, will be processed in blocks, and for each block, the first - * input, - * being the inner side, will read repeatedly from disk. - * - * @see #HINT_LOCAL_STRATEGY - */ - public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND"; - - /** - * The log handle that is used by the compiler to log messages. - */ - public static final Logger LOG = LoggerFactory.getLogger(PactCompiler.class); - - // ------------------------------------------------------------------------ - // Members - // ------------------------------------------------------------------------ - - /** - * The statistics object used to obtain statistics, such as input sizes, - * for the cost estimation process. - */ - private final DataStatistics statistics; - - /** - * The cost estimator used by the compiler. - */ - private final CostEstimator costEstimator; - - /** - * The default degree of parallelism for jobs compiled by this compiler. - */ - private int defaultDegreeOfParallelism; - - - // ------------------------------------------------------------------------ - // Constructor & Setup - // ------------------------------------------------------------------------ - - /** - * Creates a new optimizer instance. The optimizer has no access to statistics about the - * inputs and can hence not determine any properties. It will perform all optimization with - * unknown sizes and hence use only the heuristic cost functions, which result in the selection - * of the most robust execution strategies. - */ - public PactCompiler() { - this(null, new DefaultCostEstimator()); - } - - /** - * Creates a new optimizer instance that uses the statistics object to determine properties about the input. - * Given those statistics, the optimizer can make better choices for the execution strategies. - * - * @param stats - * The statistics to be used to determine the input properties. - */ - public PactCompiler(DataStatistics stats) { - this(stats, new DefaultCostEstimator()); - } - - /** - * Creates a new optimizer instance. The optimizer has no access to statistics about the - * inputs and can hence not determine any properties. It will perform all optimization with - * unknown sizes and hence use only the heuristic cost functions, which result in the selection - * of the most robust execution strategies. - * - * The optimizer uses the given cost estimator to compute the costs of the individual operations. - * - * @param estimator The cost estimator to use to cost the individual operations. - */ - public PactCompiler(CostEstimator estimator) { - this(null, estimator); - } - - /** - * Creates a new optimizer instance that uses the statistics object to determine properties about the input. - * Given those statistics, the optimizer can make better choices for the execution strategies. - * - * The optimizer uses the given cost estimator to compute the costs of the individual operations. - * - * @param stats - * The statistics to be used to determine the input properties. - * @param estimator - * The <tt>CostEstimator</tt> to use to cost the individual operations. - */ - public PactCompiler(DataStatistics stats, CostEstimator estimator) { - this.statistics = stats; - this.costEstimator = estimator; - - // determine the default parallelism - this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger( - ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, - ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE); - - if (defaultDegreeOfParallelism < 1) { - LOG.warn("Config value " + defaultDegreeOfParallelism + " for option " - + ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE + " is invalid. Ignoring and using a value of 1."); - this.defaultDegreeOfParallelism = 1; - } - } - - // ------------------------------------------------------------------------ - // Getters / Setters - // ------------------------------------------------------------------------ - - public int getDefaultDegreeOfParallelism() { - return defaultDegreeOfParallelism; - } - - public void setDefaultDegreeOfParallelism(int defaultDegreeOfParallelism) { - if (defaultDegreeOfParallelism > 0) { - this.defaultDegreeOfParallelism = defaultDegreeOfParallelism; - } else { - throw new IllegalArgumentException("Default parallelism cannot be zero or negative."); - } - } - - // ------------------------------------------------------------------------ - // Compilation - // ------------------------------------------------------------------------ - - /** - * Translates the given program to an OptimizedPlan, where all nodes have their local strategy assigned - * and all channels have a shipping strategy assigned. - * - * For more details on the optimization phase, see the comments for - * {@link #compile(org.apache.flink.api.common.Plan, org.apache.flink.optimizer.postpass.OptimizerPostPass)}. - * - * @param program The program to be translated. - * @return The optimized plan. - * - * @throws CompilerException - * Thrown, if the plan is invalid or the optimizer encountered an inconsistent - * situation during the compilation process. - */ - public OptimizedPlan compile(Plan program) throws CompilerException { - final OptimizerPostPass postPasser = getPostPassFromPlan(program); - return compile(program, postPasser); - } - - /** - * Translates the given program to an OptimizedPlan. The optimized plan describes for each operator - * which strategy to use (such as hash join versus sort-merge join), what data exchange method to use - * (local pipe forward, shuffle, broadcast), what exchange mode to use (pipelined, batch), - * where to cache intermediate results, etc, - * - * The optimization happens in multiple phases: - * <ol> - * <li>Create optimizer dag implementation of the program. - * - * <tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute size estimates.</li> - * <li>Compute interesting properties and auxiliary structures.</li> - * <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting property computation (as - * opposed to the Database approaches), because we support plans that are not trees.</li> - * </ol> - * - * @param program The program to be translated. - * @param postPasser The function to be used for post passing the optimizer's plan and setting the - * data type specific serialization routines. - * @return The optimized plan. - * - * @throws CompilerException - * Thrown, if the plan is invalid or the optimizer encountered an inconsistent - * situation during the compilation process. - */ - private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws CompilerException { - if (program == null || postPasser == null) { - throw new NullPointerException(); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Beginning compilation of program '" + program.getJobName() + '\''); - } - - final ExecutionMode defaultDataExchangeMode = program.getExecutionConfig().getExecutionMode(); - - final int defaultParallelism = program.getDefaultParallelism() > 0 ? - program.getDefaultParallelism() : this.defaultDegreeOfParallelism; - - // log the default settings - LOG.debug("Using a default parallelism of {}", defaultParallelism); - LOG.debug("Using default data exchange mode {}", defaultDataExchangeMode); - - // the first step in the compilation is to create the optimizer plan representation - // this step does the following: - // 1) It creates an optimizer plan node for each operator - // 2) It connects them via channels - // 3) It looks for hints about local strategies and channel types and - // sets the types and strategies accordingly - // 4) It makes estimates about the data volume of the data sources and - // propagates those estimates through the plan - - GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode); - program.accept(graphCreator); - - // if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children - // each until we have only a single root node. This allows to transparently deal with the nodes with - // multiple outputs - OptimizerNode rootNode; - if (graphCreator.sinks.size() == 1) { - rootNode = graphCreator.sinks.get(0); - } else if (graphCreator.sinks.size() > 1) { - Iterator<DataSinkNode> iter = graphCreator.sinks.iterator(); - rootNode = iter.next(); - - while (iter.hasNext()) { - rootNode = new SinkJoiner(rootNode, iter.next()); - } - } else { - throw new CompilerException("Bug: The optimizer plan representation has no sinks."); - } - - // now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal - // guaranteed memory, for further cost estimations. we assume an equal distribution of memory among consumer tasks - rootNode.accept(new IdAndEstimatesVisitor(this.statistics)); - - // We are dealing with operator DAGs, rather than operator trees. - // That requires us to deviate at some points from the classical DB optimizer algorithms. - // This step build some auxiliary structures to help track branches and joins in the DAG - BranchesVisitor branchingVisitor = new BranchesVisitor(); - rootNode.accept(branchingVisitor); - - // Propagate the interesting properties top-down through the graph - InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator); - rootNode.accept(propsVisitor); - - // perform a sanity check: the root may not have any unclosed branches - if (rootNode.getOpenBranches() != null && rootNode.getOpenBranches().size() > 0) { - throw new CompilerException("Bug: Logic for branching plans (non-tree plans) has an error, and does not " + - "track the re-joining of branches correctly."); - } - - // the final step is now to generate the actual plan alternatives - List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator); - - if (bestPlan.size() != 1) { - throw new CompilerException("Error in compiler: more than one best plan was created!"); - } - - // check if the best plan's root is a data sink (single sink plan) - // if so, directly take it. if it is a sink joiner node, get its contained sinks - PlanNode bestPlanRoot = bestPlan.get(0); - List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4); - - if (bestPlanRoot instanceof SinkPlanNode) { - bestPlanSinks.add((SinkPlanNode) bestPlanRoot); - } else if (bestPlanRoot instanceof SinkJoinerPlanNode) { - ((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks); - } - - DeadlockPreventer dp = new DeadlockPreventer(); - dp.resolveDeadlocks(bestPlanSinks); - - // finalize the plan - OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program); - - plan.accept(new BinaryUnionReplacer()); - - // post pass the plan. this is the phase where the serialization and comparator code is set - postPasser.postPass(plan); - - return plan; - } - - /** - * This function performs only the first step to the compilation process - the creation of the optimizer - * representation of the plan. No estimations or enumerations of alternatives are done here. - * - * @param program The plan to generate the optimizer representation for. - * @return The optimizer representation of the plan, as a collection of all data sinks - * from the plan can be traversed. - */ - public static List<DataSinkNode> createPreOptimizedPlan(Plan program) { - GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(1, null); - program.accept(graphCreator); - return graphCreator.sinks; - } - - // ------------------------------------------------------------------------ - // Visitors for Compilation Traversals - // ------------------------------------------------------------------------ - - /** - * This utility class performs the translation from the user specified program to the optimizer plan. - * It works as a visitor that walks the user's job in a depth-first fashion. During the descend, it creates - * an optimizer node for each operator, respectively data source or -sink. During the ascend, it connects - * the nodes to the full graph. - * <p> - * This translator relies on the <code>setInputs</code> method in the nodes. As that method implements the size - * estimation and the awareness for optimizer hints, the sizes will be properly estimated and the translated plan - * already respects all optimizer hints. - */ - public static final class GraphCreatingVisitor implements Visitor<Operator<?>> { - - private final Map<Operator<?>, OptimizerNode> con2node; // map from the operator objects to their - // corresponding optimizer nodes - - private final List<DataSinkNode> sinks; // all data sink nodes in the optimizer plan - - private final int defaultParallelism; // the default degree of parallelism - - private final GraphCreatingVisitor parent; // reference to enclosing creator, in case of a recursive translation - - private final ExecutionMode defaultDataExchangeMode; - - private final boolean forceDOP; - - - public GraphCreatingVisitor(int defaultParallelism, ExecutionMode defaultDataExchangeMode) { - this(null, false, defaultParallelism, defaultDataExchangeMode, null); - } - - private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int defaultParallelism, - ExecutionMode dataExchangeMode, HashMap<Operator<?>, OptimizerNode> closure) { - if (closure == null){ - con2node = new HashMap<Operator<?>, OptimizerNode>(); - } else { - con2node = closure; - } - - this.sinks = new ArrayList<DataSinkNode>(2); - this.defaultParallelism = defaultParallelism; - this.parent = parent; - this.defaultDataExchangeMode = dataExchangeMode; - this.forceDOP = forceDOP; - } - - public List<DataSinkNode> getSinks() { - return sinks; - } - - @SuppressWarnings("deprecation") - @Override - public boolean preVisit(Operator<?> c) { - // check if we have been here before - if (this.con2node.containsKey(c)) { - return false; - } - - final OptimizerNode n; - - // create a node for the operator (or sink or source) if we have not been here before - if (c instanceof GenericDataSinkBase) { - DataSinkNode dsn = new DataSinkNode((GenericDataSinkBase<?>) c); - this.sinks.add(dsn); - n = dsn; - } - else if (c instanceof GenericDataSourceBase) { - n = new DataSourceNode((GenericDataSourceBase<?, ?>) c); - } - else if (c instanceof MapOperatorBase) { - n = new MapNode((MapOperatorBase<?, ?, ?>) c); - } - else if (c instanceof MapPartitionOperatorBase) { - n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c); - } - else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) { - n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c); - } - else if (c instanceof FlatMapOperatorBase) { - n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c); - } - else if (c instanceof FilterOperatorBase) { - n = new FilterNode((FilterOperatorBase<?, ?>) c); - } - else if (c instanceof ReduceOperatorBase) { - n = new ReduceNode((ReduceOperatorBase<?, ?>) c); - } - else if (c instanceof GroupReduceOperatorBase) { - n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c); - } - else if (c instanceof GroupCombineOperatorBase) { - n = new GroupCombineNode((GroupCombineOperatorBase<?, ?, ?>) c); - } - else if (c instanceof JoinOperatorBase) { - n = new JoinNode((JoinOperatorBase<?, ?, ?, ?>) c); - } - else if (c instanceof CoGroupOperatorBase) { - n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) c); - } - else if (c instanceof CrossOperatorBase) { - n = new CrossNode((CrossOperatorBase<?, ?, ?, ?>) c); - } - else if (c instanceof BulkIterationBase) { - n = new BulkIterationNode((BulkIterationBase<?>) c); - } - else if (c instanceof DeltaIterationBase) { - n = new WorksetIterationNode((DeltaIterationBase<?, ?>) c); - } - else if (c instanceof Union){ - n = new BinaryUnionNode((Union<?>) c); - } - else if (c instanceof PartitionOperatorBase) { - n = new PartitionNode((PartitionOperatorBase<?>) c); - } - else if (c instanceof SortPartitionOperatorBase) { - n = new SortPartitionNode((SortPartitionOperatorBase<?>) c); - } - else if (c instanceof PartialSolutionPlaceHolder) { - if (this.parent == null) { - throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations."); - } - - final PartialSolutionPlaceHolder<?> holder = (PartialSolutionPlaceHolder<?>) c; - final BulkIterationBase<?> enclosingIteration = holder.getContainingBulkIteration(); - final BulkIterationNode containingIterationNode = - (BulkIterationNode) this.parent.con2node.get(enclosingIteration); - - // catch this for the recursive translation of step functions - BulkPartialSolutionNode p = new BulkPartialSolutionNode(holder, containingIterationNode); - p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism()); - n = p; - } - else if (c instanceof WorksetPlaceHolder) { - if (this.parent == null) { - throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations."); - } - - final WorksetPlaceHolder<?> holder = (WorksetPlaceHolder<?>) c; - final DeltaIterationBase<?, ?> enclosingIteration = holder.getContainingWorksetIteration(); - final WorksetIterationNode containingIterationNode = - (WorksetIterationNode) this.parent.con2node.get(enclosingIteration); - - // catch this for the recursive translation of step functions - WorksetNode p = new WorksetNode(holder, containingIterationNode); - p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism()); - n = p; - } - else if (c instanceof SolutionSetPlaceHolder) { - if (this.parent == null) { - throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations."); - } - - final SolutionSetPlaceHolder<?> holder = (SolutionSetPlaceHolder<?>) c; - final DeltaIterationBase<?, ?> enclosingIteration = holder.getContainingWorksetIteration(); - final WorksetIterationNode containingIterationNode = - (WorksetIterationNode) this.parent.con2node.get(enclosingIteration); - - // catch this for the recursive translation of step functions - SolutionSetNode p = new SolutionSetNode(holder, containingIterationNode); - p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism()); - n = p; - } - else { - throw new IllegalArgumentException("Unknown operator type: " + c); - } - - this.con2node.put(c, n); - - // set the parallelism only if it has not been set before. some nodes have a fixed DOP, such as the - // key-less reducer (all-reduce) - if (n.getDegreeOfParallelism() < 1) { - // set the degree of parallelism - int par = c.getDegreeOfParallelism(); - if (par > 0) { - if (this.forceDOP && par != this.defaultParallelism) { - par = this.defaultParallelism; - LOG.warn("The parallelism of nested dataflows (such as step functions in iterations) is " + - "currently fixed to the parallelism of the surrounding operator (the iteration)."); - } - } else { - par = this.defaultParallelism; - } - n.setDegreeOfParallelism(par); - } - - return true; - } - - @Override - public void postVisit(Operator<?> c) { - - OptimizerNode n = this.con2node.get(c); - - // first connect to the predecessors - n.setInput(this.con2node, this.defaultDataExchangeMode); - n.setBroadcastInputs(this.con2node, this.defaultDataExchangeMode); - - // if the node represents a bulk iteration, we recursively translate the data flow now - if (n instanceof BulkIterationNode) { - final BulkIterationNode iterNode = (BulkIterationNode) n; - final BulkIterationBase<?> iter = iterNode.getIterationContract(); - - // pass a copy of the no iterative part into the iteration translation, - // in case the iteration references its closure - HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>, OptimizerNode>(con2node); - - // first, recursively build the data flow for the step function - final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true, - iterNode.getDegreeOfParallelism(), defaultDataExchangeMode, closure); - - BulkPartialSolutionNode partialSolution; - - iter.getNextPartialSolution().accept(recursiveCreator); - - partialSolution = (BulkPartialSolutionNode) recursiveCreator.con2node.get(iter.getPartialSolution()); - OptimizerNode rootOfStepFunction = recursiveCreator.con2node.get(iter.getNextPartialSolution()); - if (partialSolution == null) { - throw new CompilerException("Error: The step functions result does not depend on the partial solution."); - } - - - OptimizerNode terminationCriterion = null; - - if (iter.getTerminationCriterion() != null) { - terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion()); - - // no intermediate node yet, traverse from the termination criterion to build the missing parts - if (terminationCriterion == null) { - iter.getTerminationCriterion().accept(recursiveCreator); - terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion()); - } - } - - iterNode.setPartialSolution(partialSolution); - iterNode.setNextPartialSolution(rootOfStepFunction, terminationCriterion); - - // go over the contained data flow and mark the dynamic path nodes - StaticDynamicPathIdentifier identifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight()); - iterNode.acceptForStepFunction(identifier); - } - else if (n instanceof WorksetIterationNode) { - final WorksetIterationNode iterNode = (WorksetIterationNode) n; - final DeltaIterationBase<?, ?> iter = iterNode.getIterationContract(); - - // we need to ensure that both the next-workset and the solution-set-delta depend on the workset. - // One check is for free during the translation, we do the other check here as a pre-condition - { - StepFunctionValidator wsf = new StepFunctionValidator(); - iter.getNextWorkset().accept(wsf); - if (!wsf.foundWorkset) { - throw new CompilerException("In the given program, the next workset does not depend on the workset. " + - "This is a prerequisite in delta iterations."); - } - } - - // calculate the closure of the anonymous function - HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>, OptimizerNode>(con2node); - - // first, recursively build the data flow for the step function - final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor( - this, true, iterNode.getDegreeOfParallelism(), defaultDataExchangeMode, closure); - - // descend from the solution set delta. check that it depends on both the workset - // and the solution set. If it does depend on both, this descend should create both nodes - iter.getSolutionSetDelta().accept(recursiveCreator); - - final WorksetNode worksetNode = (WorksetNode) recursiveCreator.con2node.get(iter.getWorkset()); - - if (worksetNode == null) { - throw new CompilerException("In the given program, the solution set delta does not depend on the workset." + - "This is a prerequisite in delta iterations."); - } - - iter.getNextWorkset().accept(recursiveCreator); - - SolutionSetNode solutionSetNode = (SolutionSetNode) recursiveCreator.con2node.get(iter.getSolutionSet()); - - if (solutionSetNode == null || solutionSetNode.getOutgoingConnections() == null || solutionSetNode.getOutgoingConnections().isEmpty()) { - solutionSetNode = new SolutionSetNode((SolutionSetPlaceHolder<?>) iter.getSolutionSet(), iterNode); - } - else { - for (PactConnection conn : solutionSetNode.getOutgoingConnections()) { - OptimizerNode successor = conn.getTarget(); - - if (successor.getClass() == JoinNode.class) { - // find out which input to the match the solution set is - JoinNode mn = (JoinNode) successor; - if (mn.getFirstPredecessorNode() == solutionSetNode) { - mn.makeJoinWithSolutionSet(0); - } else if (mn.getSecondPredecessorNode() == solutionSetNode) { - mn.makeJoinWithSolutionSet(1); - } else { - throw new CompilerException(); - } - } - else if (successor.getClass() == CoGroupNode.class) { - CoGroupNode cg = (CoGroupNode) successor; - if (cg.getFirstPredecessorNode() == solutionSetNode) { - cg.makeCoGroupWithSolutionSet(0); - } else if (cg.getSecondPredecessorNode() == solutionSetNode) { - cg.makeCoGroupWithSolutionSet(1); - } else { - throw new CompilerException(); - } - } - else { - throw new InvalidProgramException( - "Error: The only operations allowed on the solution set are Join and CoGroup."); - } - } - } - - final OptimizerNode nextWorksetNode = recursiveCreator.con2node.get(iter.getNextWorkset()); - final OptimizerNode solutionSetDeltaNode = recursiveCreator.con2node.get(iter.getSolutionSetDelta()); - - // set the step function nodes to the iteration node - iterNode.setPartialSolution(solutionSetNode, worksetNode); - iterNode.setNextPartialSolution(solutionSetDeltaNode, nextWorksetNode, defaultDataExchangeMode); - - // go over the contained data flow and mark the dynamic path nodes - StaticDynamicPathIdentifier pathIdentifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight()); - iterNode.acceptForStepFunction(pathIdentifier); - } - } - } - - private static final class StaticDynamicPathIdentifier implements Visitor<OptimizerNode> { - - private final Set<OptimizerNode> seenBefore = new HashSet<OptimizerNode>(); - - private final int costWeight; - - private StaticDynamicPathIdentifier(int costWeight) { - this.costWeight = costWeight; - } - - @Override - public boolean preVisit(OptimizerNode visitable) { - return this.seenBefore.add(visitable); - } - - @Override - public void postVisit(OptimizerNode visitable) { - visitable.identifyDynamicPath(this.costWeight); - - // check that there is no nested iteration on the dynamic path - if (visitable.isOnDynamicPath() && visitable instanceof IterationNode) { - throw new CompilerException("Nested iterations are currently not supported."); - } - } - } - - /** - * Simple visitor that sets the minimal guaranteed memory per task based on the amount of available memory, - * the number of memory consumers, and on the task's degree of parallelism. - */ - public static final class IdAndEstimatesVisitor implements Visitor<OptimizerNode> { - - private final DataStatistics statistics; - - private int id = 1; - - public IdAndEstimatesVisitor(DataStatistics statistics) { - this.statistics = statistics; - } - - @Override - public boolean preVisit(OptimizerNode visitable) { - return visitable.getId() == -1; - } - - @Override - public void postVisit(OptimizerNode visitable) { - // the node ids - visitable.initId(this.id++); - - // connections need to figure out their maximum path depths - for (PactConnection conn : visitable.getIncomingConnections()) { - conn.initMaxDepth(); - } - for (PactConnection conn : visitable.getBroadcastConnections()) { - conn.initMaxDepth(); - } - - // the estimates - visitable.computeOutputEstimates(this.statistics); - - // if required, recurse into the step function - if (visitable instanceof IterationNode) { - ((IterationNode) visitable).acceptForStepFunction(this); - } - } - } - - /** - * Visitor that computes the interesting properties for each node in the plan. On its recursive - * depth-first descend, it propagates all interesting properties top-down. - */ - public static final class InterestingPropertyVisitor implements Visitor<OptimizerNode> { - - private CostEstimator estimator; // the cost estimator for maximal costs of an interesting property - - /** - * Creates a new visitor that computes the interesting properties for all nodes in the plan. - * It uses the given cost estimator used to compute the maximal costs for an interesting property. - * - * @param estimator - * The cost estimator to estimate the maximal costs for interesting properties. - */ - public InterestingPropertyVisitor(CostEstimator estimator) { - this.estimator = estimator; - } - - @Override - public boolean preVisit(OptimizerNode node) { - // The interesting properties must be computed on the descend. In case a node has multiple outputs, - // that computation must happen during the last descend. - - if (node.getInterestingProperties() == null && node.haveAllOutputConnectionInterestingProperties()) { - node.computeUnionOfInterestingPropertiesFromSuccessors(); - node.computeInterestingPropertiesForInputs(this.estimator); - return true; - } else { - return false; - } - } - - @Override - public void postVisit(OptimizerNode visitable) {} - } - - /** - * On its re-ascend (post visit) this visitor, computes auxiliary maps that are needed to support plans - * that are not a minimally connected DAG (Such plans are not trees, but at least one node feeds its - * output into more than one other node). - */ - public static final class BranchesVisitor implements Visitor<OptimizerNode> { - - @Override - public boolean preVisit(OptimizerNode node) { - return node.getOpenBranches() == null; - } - - @Override - public void postVisit(OptimizerNode node) { - if (node instanceof IterationNode) { - ((IterationNode) node).acceptForStepFunction(this); - } - - node.computeUnclosedBranchStack(); - } - } - - /** - * Finalization of the plan: - * - The graph of nodes is double-linked (links from child to parent are inserted) - * - If unions join static and dynamic paths, the cache is marked as a memory consumer - * - Relative memory fractions are assigned to all nodes. - * - All nodes are collected into a set. - */ - private static final class PlanFinalizer implements Visitor<PlanNode> { - - private final Set<PlanNode> allNodes; // a set of all nodes in the optimizer plan - - private final List<SourcePlanNode> sources; // all data source nodes in the optimizer plan - - private final List<SinkPlanNode> sinks; // all data sink nodes in the optimizer plan - - private final Deque<IterationPlanNode> stackOfIterationNodes; - - private int memoryConsumerWeights; // a counter of all memory consumers - - /** - * Creates a new plan finalizer. - */ - private PlanFinalizer() { - this.allNodes = new HashSet<PlanNode>(); - this.sources = new ArrayList<SourcePlanNode>(); - this.sinks = new ArrayList<SinkPlanNode>(); - this.stackOfIterationNodes = new ArrayDeque<IterationPlanNode>(); - } - - private OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan) { - this.memoryConsumerWeights = 0; - - // traverse the graph - for (SinkPlanNode node : sinks) { - node.accept(this); - } - - // assign the memory to each node - if (this.memoryConsumerWeights > 0) { - for (PlanNode node : this.allNodes) { - // assign memory to the driver strategy of the node - final int consumerWeight = node.getMemoryConsumerWeight(); - if (consumerWeight > 0) { - final double relativeMem = (double)consumerWeight / this.memoryConsumerWeights; - node.setRelativeMemoryPerSubtask(relativeMem); - if (LOG.isDebugEnabled()) { - LOG.debug("Assigned " + relativeMem + " of total memory to each subtask of " + - node.getPactContract().getName() + "."); - } - } - - // assign memory to the local and global strategies of the channels - for (Channel c : node.getInputs()) { - if (c.getLocalStrategy().dams()) { - final double relativeMem = 1.0 / this.memoryConsumerWeights; - c.setRelativeMemoryLocalStrategy(relativeMem); - if (LOG.isDebugEnabled()) { - LOG.debug("Assigned " + relativeMem + " of total memory to each local strategy " + - "instance of " + c + "."); - } - } - if (c.getTempMode() != TempMode.NONE) { - final double relativeMem = 1.0/ this.memoryConsumerWeights; - c.setRelativeTempMemory(relativeMem); - if (LOG.isDebugEnabled()) { - LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " + - "table for " + c + "."); - } - } - } - } - } - return new OptimizedPlan(this.sources, this.sinks, this.allNodes, jobName, originalPlan); - } - - @Override - public boolean preVisit(PlanNode visitable) { - // if we come here again, prevent a further descend - if (!this.allNodes.add(visitable)) { - return false; - } - - if (visitable instanceof SinkPlanNode) { - this.sinks.add((SinkPlanNode) visitable); - } - else if (visitable instanceof SourcePlanNode) { - this.sources.add((SourcePlanNode) visitable); - } - else if (visitable instanceof BinaryUnionPlanNode) { - BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable; - if (unionNode.unionsStaticAndDynamicPath()) { - unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED); - } - } - else if (visitable instanceof BulkPartialSolutionPlanNode) { - // tell the partial solution about the iteration node that contains it - final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) visitable; - final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast(); - - // sanity check! - if (iteration == null || !(iteration instanceof BulkIterationPlanNode)) { - throw new CompilerException("Bug: Error finalizing the plan. " + - "Cannot associate the node for a partial solutions with its containing iteration."); - } - pspn.setContainingIterationNode((BulkIterationPlanNode) iteration); - } - else if (visitable instanceof WorksetPlanNode) { - // tell the partial solution about the iteration node that contains it - final WorksetPlanNode wspn = (WorksetPlanNode) visitable; - final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast(); - - // sanity check! - if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) { - throw new CompilerException("Bug: Error finalizing the plan. " + - "Cannot associate the node for a partial solutions with its containing iteration."); - } - wspn.setContainingIterationNode((WorksetIterationPlanNode) iteration); - } - else if (visitable instanceof SolutionSetPlanNode) { - // tell the partial solution about the iteration node that contains it - final SolutionSetPlanNode sspn = (SolutionSetPlanNode) visitable; - final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast(); - - // sanity check! - if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) { - throw new CompilerException("Bug: Error finalizing the plan. " + - "Cannot associate the node for a partial solutions with its containing iteration."); - } - sspn.setContainingIterationNode((WorksetIterationPlanNode) iteration); - } - - // double-connect the connections. previously, only parents knew their children, because - // one child candidate could have been referenced by multiple parents. - for (Channel conn : visitable.getInputs()) { - conn.setTarget(visitable); - conn.getSource().addOutgoingChannel(conn); - } - - for (Channel c : visitable.getBroadcastInputs()) { - c.setTarget(visitable); - c.getSource().addOutgoingChannel(c); - } - - // count the memory consumption - this.memoryConsumerWeights += visitable.getMemoryConsumerWeight(); - for (Channel c : visitable.getInputs()) { - if (c.getLocalStrategy().dams()) { - this.memoryConsumerWeights++; - } - if (c.getTempMode() != TempMode.NONE) { - this.memoryConsumerWeights++; - } - } - for (Channel c : visitable.getBroadcastInputs()) { - if (c.getLocalStrategy().dams()) { - this.memoryConsumerWeights++; - } - if (c.getTempMode() != TempMode.NONE) { - this.memoryConsumerWeights++; - } - } - - // pass the visitor to the iteraton's step function - if (visitable instanceof IterationPlanNode) { - // push the iteration node onto the stack - final IterationPlanNode iterNode = (IterationPlanNode) visitable; - this.stackOfIterationNodes.addLast(iterNode); - - // recurse - ((IterationPlanNode) visitable).acceptForStepFunction(this); - - // pop the iteration node from the stack - this.stackOfIterationNodes.removeLast(); - } - return true; - } - - @Override - public void postVisit(PlanNode visitable) {} - } - - /** - * A visitor that traverses the graph and collects cascading binary unions into a single n-ary - * union operator. The exception is, when on of the union inputs is materialized, such as in the - * static-code-path-cache in iterations. - */ - private static final class BinaryUnionReplacer implements Visitor<PlanNode> { - - private final Set<PlanNode> seenBefore = new HashSet<PlanNode>(); - - @Override - public boolean preVisit(PlanNode visitable) { - if (this.seenBefore.add(visitable)) { - if (visitable instanceof IterationPlanNode) { - ((IterationPlanNode) visitable).acceptForStepFunction(this); - } - return true; - } else { - return false; - } - } - - @Override - public void postVisit(PlanNode visitable) { - - if (visitable instanceof BinaryUnionPlanNode) { - - final BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable; - final Channel in1 = unionNode.getInput1(); - final Channel in2 = unionNode.getInput2(); - - if (!unionNode.unionsStaticAndDynamicPath()) { - - // both on static path, or both on dynamic path. we can collapse them - NAryUnionPlanNode newUnionNode; - - List<Channel> inputs = new ArrayList<Channel>(); - collect(in1, inputs); - collect(in2, inputs); - - newUnionNode = new NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs, - unionNode.getGlobalProperties(), unionNode.getCumulativeCosts()); - - newUnionNode.setDegreeOfParallelism(unionNode.getDegreeOfParallelism()); - - for (Channel c : inputs) { - c.setTarget(newUnionNode); - } - - for (Channel channel : unionNode.getOutgoingChannels()) { - channel.swapUnionNodes(newUnionNode); - newUnionNode.addOutgoingChannel(channel); - } - } - else { - // union between the static and the dynamic path. we need to handle this for now - // through a special union operator - - // make sure that the first input is the cached (static) and the second input is the dynamic - if (in1.isOnDynamicPath()) { - BinaryUnionPlanNode newUnionNode = new BinaryUnionPlanNode(unionNode); - - in1.setTarget(newUnionNode); - in2.setTarget(newUnionNode); - - for (Channel channel : unionNode.getOutgoingChannels()) { - channel.swapUnionNodes(newUnionNode); - newUnionNode.addOutgoingChannel(channel); - } - } - } - } - } - - private void collect(Channel in, List<Channel> inputs) { - if (in.getSource() instanceof NAryUnionPlanNode) { - // sanity check - if (in.getShipStrategy() != ShipStrategyType.FORWARD) { - throw new CompilerException("Bug: Plan generation for Unions picked a ship strategy between binary plan operators."); - } - if (!(in.getLocalStrategy() == null || in.getLocalStrategy() == LocalStrategy.NONE)) { - throw new CompilerException("Bug: Plan generation for Unions picked a local strategy between binary plan operators."); - } - - inputs.addAll(((NAryUnionPlanNode) in.getSource()).getListOfInputs()); - } else { - // is not a collapsed union node, so we take the channel directly - inputs.add(in); - } - } - } - - private static final class StepFunctionValidator implements Visitor<Operator<?>> { - - private final Set<Operator<?>> seenBefore = new HashSet<Operator<?>>(); - - private boolean foundWorkset; - - @Override - public boolean preVisit(Operator<?> visitable) { - if (visitable instanceof WorksetPlaceHolder) { - foundWorkset = true; - } - - return (!foundWorkset) && seenBefore.add(visitable); - } - - @Override - public void postVisit(Operator<?> visitable) {} - } - - // ------------------------------------------------------------------------ - // Miscellaneous - // ------------------------------------------------------------------------ - - private OptimizerPostPass getPostPassFromPlan(Plan program) { - final String className = program.getPostPassClassName(); - if (className == null) { - throw new CompilerException("Optimizer Post Pass class description is null"); - } - try { - Class<? extends OptimizerPostPass> clazz = Class.forName(className).asSubclass(OptimizerPostPass.class); - try { - return InstantiationUtil.instantiate(clazz, OptimizerPostPass.class); - } catch (RuntimeException rtex) { - // unwrap the source exception - if (rtex.getCause() != null) { - throw new CompilerException("Cannot instantiate optimizer post pass: " + rtex.getMessage(), rtex.getCause()); - } else { - throw rtex; - } - } - } catch (ClassNotFoundException cnfex) { - throw new CompilerException("Cannot load Optimizer post-pass class '" + className + "'.", cnfex); - } catch (ClassCastException ccex) { - throw new CompilerException("Class '" + className + "' is not an optimizer post passer.", ccex); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java index f8404c4..d199ae7 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.optimizer.dag; import java.util.Collections; @@ -62,8 +61,8 @@ public abstract class AbstractPartialSolutionNode extends OptimizerNode { } @Override - public List<PactConnection> getIncomingConnections() { - return Collections.<PactConnection>emptyList(); + public List<DagConnection> getIncomingConnections() { + return Collections.emptyList(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java index 8aed10c..068799e 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java @@ -107,7 +107,7 @@ public class BinaryUnionNode extends TwoInputNode { final List<? extends PlanNode> subPlans1 = getFirstPredecessorNode().getAlternativePlans(estimator); final List<? extends PlanNode> subPlans2 = getSecondPredecessorNode().getAlternativePlans(estimator); - List<PactConnection> broadcastConnections = getBroadcastConnections(); + List<DagConnection> broadcastConnections = getBroadcastConnections(); if (broadcastConnections != null && broadcastConnections.size() > 0) { throw new CompilerException("Found BroadcastVariables on a Union operation"); } @@ -122,9 +122,9 @@ public class BinaryUnionNode extends TwoInputNode { final ExecutionMode input1Mode = this.input1.getDataExchangeMode(); final ExecutionMode input2Mode = this.input2.getDataExchangeMode(); - final int dop = getDegreeOfParallelism(); - final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism(); - final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism(); + final int dop = getParallelism(); + final int inDop1 = getFirstPredecessorNode().getParallelism(); + final int inDop2 = getSecondPredecessorNode().getParallelism(); final boolean dopChange1 = dop != inDop1; final boolean dopChange2 = dop != inDop2; http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java index 8112748..c55d17a 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java @@ -31,7 +31,7 @@ import org.apache.flink.api.common.operators.base.BulkIterationBase; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.PactCompiler.InterestingPropertyVisitor; +import org.apache.flink.optimizer.Optimizer.InterestingPropertyVisitor; import org.apache.flink.optimizer.costs.CostEstimator; import org.apache.flink.optimizer.dag.WorksetIterationNode.SingleRootJoiner; import org.apache.flink.optimizer.dataproperties.GlobalProperties; @@ -62,9 +62,9 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode private OptimizerNode nextPartialSolution; - private PactConnection rootConnection; // connection out of the next partial solution + private DagConnection rootConnection; // connection out of the next partial solution - private PactConnection terminationCriterionRootConnection; // connection out of the term. criterion + private DagConnection terminationCriterionRootConnection; // connection out of the term. criterion private OptimizerNode singleRoot; @@ -93,7 +93,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode // -------------------------------------------------------------------------------------------- public BulkIterationBase<?> getIterationContract() { - return (BulkIterationBase<?>) getPactContract(); + return (BulkIterationBase<?>) getOperator(); } /** @@ -133,14 +133,14 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode // check if the root of the step function has the same DOP as the iteration // or if the step function has any operator at all - if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism() || + if (nextPartialSolution.getParallelism() != getParallelism() || nextPartialSolution == partialSolution || nextPartialSolution instanceof BinaryUnionNode) { // add a no-op to the root to express the re-partitioning NoOpNode noop = new NoOpNode(); - noop.setDegreeOfParallelism(getDegreeOfParallelism()); + noop.setDegreeOfParallelism(getParallelism()); - PactConnection noOpConn = new PactConnection(nextPartialSolution, noop, ExecutionMode.PIPELINED); + DagConnection noOpConn = new DagConnection(nextPartialSolution, noop, ExecutionMode.PIPELINED); noop.setIncomingConnection(noOpConn); nextPartialSolution.addOutgoingConnection(noOpConn); @@ -152,13 +152,13 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode if (terminationCriterion == null) { this.singleRoot = nextPartialSolution; - this.rootConnection = new PactConnection(nextPartialSolution, ExecutionMode.PIPELINED); + this.rootConnection = new DagConnection(nextPartialSolution, ExecutionMode.PIPELINED); } else { // we have a termination criterion SingleRootJoiner singleRootJoiner = new SingleRootJoiner(); - this.rootConnection = new PactConnection(nextPartialSolution, singleRootJoiner, ExecutionMode.PIPELINED); - this.terminationCriterionRootConnection = new PactConnection(terminationCriterion, singleRootJoiner, + this.rootConnection = new DagConnection(nextPartialSolution, singleRootJoiner, ExecutionMode.PIPELINED); + this.terminationCriterionRootConnection = new DagConnection(terminationCriterion, singleRootJoiner, ExecutionMode.PIPELINED); singleRootJoiner.setInputs(this.rootConnection, this.terminationCriterionRootConnection); @@ -323,7 +323,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode locPropsReq.parameterizeChannel(toNoOp); UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST); - rebuildPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism()); + rebuildPropertiesNode.setDegreeOfParallelism(candidate.getParallelism()); SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP); rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties()); @@ -352,7 +352,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode // 5) Create a candidate for the Iteration Node for every remaining plan of the step function. if (terminationCriterion == null) { for (PlanNode candidate : candidates) { - BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getPactContract().getName()+")", in, pspn, candidate); + BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", in, pspn, candidate); GlobalProperties gProps = candidate.getGlobalProperties().clone(); LocalProperties lProps = candidate.getLocalProperties().clone(); node.initProperties(gProps, lProps); @@ -367,7 +367,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode for (PlanNode candidate : candidates) { for (PlanNode terminationCandidate : terminationCriterionCandidates) { if (singleRoot.areBranchCompatible(candidate, terminationCandidate)) { - BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getPactContract().getName()+")", in, pspn, candidate, terminationCandidate); + BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", in, pspn, candidate, terminationCandidate); GlobalProperties gProps = candidate.getGlobalProperties().clone(); LocalProperties lProps = candidate.getLocalProperties().clone(); node.initProperties(gProps, lProps); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java index a6e03ff..25a7eef 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.optimizer.dag; import java.util.Collections; @@ -49,7 +48,8 @@ public class BulkPartialSolutionNode extends AbstractPartialSolutionNode { if (this.cachedPlans != null) { throw new IllegalStateException(); } else { - this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this, "PartialSolution ("+this.getPactContract().getName()+")", gProps, lProps, initialInput)); + this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this, + "PartialSolution ("+this.getOperator().getName()+")", gProps, lProps, initialInput)); } } @@ -73,13 +73,14 @@ public class BulkPartialSolutionNode extends AbstractPartialSolutionNode { // -------------------------------------------------------------------------------------------- /** - * Gets the contract object for this data source node. + * Gets the operator (here the {@link PartialSolutionPlaceHolder}) that is represented by this + * optimizer node. * - * @return The contract. + * @return The operator represented by this optimizer node. */ @Override - public PartialSolutionPlaceHolder<?> getPactContract() { - return (PartialSolutionPlaceHolder<?>) super.getPactContract(); + public PartialSolutionPlaceHolder<?> getOperator() { + return (PartialSolutionPlaceHolder<?>) super.getOperator(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java index 7c0cc9a..92076c3 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java @@ -50,8 +50,8 @@ public class CoGroupNode extends TwoInputNode { * @return The CoGroup operator. */ @Override - public CoGroupOperatorBase<?, ?, ?, ?> getPactContract() { - return (CoGroupOperatorBase<?, ?, ?, ?>) super.getPactContract(); + public CoGroupOperatorBase<?, ?, ?, ?> getOperator() { + return (CoGroupOperatorBase<?, ?, ?, ?>) super.getOperator(); } @Override @@ -85,7 +85,7 @@ public class CoGroupNode extends TwoInputNode { Ordering groupOrder1 = null; Ordering groupOrder2 = null; - CoGroupOperatorBase<?, ?, ?, ?> cgc = getPactContract(); + CoGroupOperatorBase<?, ?, ?, ?> cgc = getOperator(); groupOrder1 = cgc.getGroupOrderForInputOne(); groupOrder2 = cgc.getGroupOrderForInputTwo(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java index afeed1d..8de67e8 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java @@ -26,7 +26,7 @@ import org.apache.flink.api.common.operators.base.CrossOperatorBase; import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.operators.CrossBlockOuterFirstDescriptor; import org.apache.flink.optimizer.operators.CrossBlockOuterSecondDescriptor; import org.apache.flink.optimizer.operators.CrossStreamOuterFirstDescriptor; @@ -50,7 +50,7 @@ public class CrossNode extends TwoInputNode { super(operation); Configuration conf = operation.getParameters(); - String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null); + String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null); CrossHint hint = operation.getCrossHint(); @@ -60,13 +60,13 @@ public class CrossNode extends TwoInputNode { final boolean allowBCsecond = hint != CrossHint.FIRST_IS_SMALL; final OperatorDescriptorDual fixedDriverStrat; - if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy)) { + if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy)) { fixedDriverStrat = new CrossBlockOuterFirstDescriptor(allowBCfirst, allowBCsecond); - } else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND.equals(localStrategy)) { + } else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND.equals(localStrategy)) { fixedDriverStrat = new CrossBlockOuterSecondDescriptor(allowBCfirst, allowBCsecond); - } else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST.equals(localStrategy)) { + } else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST.equals(localStrategy)) { fixedDriverStrat = new CrossStreamOuterFirstDescriptor(allowBCfirst, allowBCsecond); - } else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND.equals(localStrategy)) { + } else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND.equals(localStrategy)) { fixedDriverStrat = new CrossStreamOuterSecondDescriptor(allowBCfirst, allowBCsecond); } else { throw new CompilerException("Invalid local strategy hint for cross contract: " + localStrategy); @@ -99,8 +99,8 @@ public class CrossNode extends TwoInputNode { // ------------------------------------------------------------------------ @Override - public CrossOperatorBase<?, ?, ?, ?> getPactContract() { - return (CrossOperatorBase<?, ?, ?, ?>) super.getPactContract(); + public CrossOperatorBase<?, ?, ?, ?> getOperator() { + return (CrossOperatorBase<?, ?, ?, ?>) super.getOperator(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java new file mode 100644 index 0000000..360f579 --- /dev/null +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.optimizer.dataproperties.InterestingProperties; +import org.apache.flink.optimizer.plandump.DumpableConnection; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; + +/** + * A connection between to operators. Represents an intermediate result + * and a data exchange between the two operators. + * + * The data exchange has a mode in which it performs (batch / pipelined). + * + * The data exchange strategy may be set on this connection, in which case + * it is fixed and will not be determined during candidate plan enumeration. + * + * During the enumeration of interesting properties, this connection also holds + * all interesting properties generated by the successor operator. + */ +public class DagConnection implements EstimateProvider, DumpableConnection<OptimizerNode> { + + private final OptimizerNode source; // The source node of the connection + + private final OptimizerNode target; // The target node of the connection. + + private final ExecutionMode dataExchangeMode; // defines whether to use batch or pipelined data exchange + + private InterestingProperties interestingProps; // local properties that succeeding nodes are interested in + + private ShipStrategyType shipStrategy; // The data shipping strategy, if predefined. + + private TempMode materializationMode = TempMode.NONE; // the materialization mode + + private int maxDepth = -1; + + private boolean breakPipeline; // whether this connection should break the pipeline due to potential deadlocks + + /** + * Creates a new Connection between two nodes. The shipping strategy is by default <tt>NONE</tt>. + * The temp mode is by default <tt>NONE</tt>. + * + * @param source + * The source node. + * @param target + * The target node. + */ + public DagConnection(OptimizerNode source, OptimizerNode target, ExecutionMode exchangeMode) { + this(source, target, null, exchangeMode); + } + + /** + * Creates a new Connection between two nodes. + * + * @param source + * The source node. + * @param target + * The target node. + * @param shipStrategy + * The shipping strategy. + * @param exchangeMode + * The data exchange mode (pipelined / batch / batch only for shuffles / ... ) + */ + public DagConnection(OptimizerNode source, OptimizerNode target, + ShipStrategyType shipStrategy, ExecutionMode exchangeMode) + { + if (source == null || target == null) { + throw new NullPointerException("Source and target must not be null."); + } + this.source = source; + this.target = target; + this.shipStrategy = shipStrategy; + this.dataExchangeMode = exchangeMode; + } + + /** + * Constructor to create a result from an operator that is not + * consumed by another operator. + * + * @param source + * The source node. + */ + public DagConnection(OptimizerNode source, ExecutionMode exchangeMode) { + if (source == null) { + throw new NullPointerException("Source and target must not be null."); + } + this.source = source; + this.target = null; + this.shipStrategy = ShipStrategyType.NONE; + this.dataExchangeMode = exchangeMode; + } + + /** + * Gets the source of the connection. + * + * @return The source Node. + */ + public OptimizerNode getSource() { + return this.source; + } + + /** + * Gets the target of the connection. + * + * @return The target node. + */ + public OptimizerNode getTarget() { + return this.target; + } + + /** + * Gets the shipping strategy for this connection. + * + * @return The connection's shipping strategy. + */ + public ShipStrategyType getShipStrategy() { + return this.shipStrategy; + } + + /** + * Sets the shipping strategy for this connection. + * + * @param strategy + * The shipping strategy to be applied to this connection. + */ + public void setShipStrategy(ShipStrategyType strategy) { + this.shipStrategy = strategy; + } + + /** + * Gets the data exchange mode to use for this connection. + * + * @return The data exchange mode to use for this connection. + */ + public ExecutionMode getDataExchangeMode() { + if (dataExchangeMode == null) { + throw new IllegalStateException("This connection does not have the data exchange mode set"); + } + return dataExchangeMode; + } + + /** + * Marks that this connection should do a decoupled data exchange (such as batched) + * rather then pipeline data. Connections are marked as pipeline breakers to avoid + * deadlock situations. + */ + public void markBreaksPipeline() { + this.breakPipeline = true; + } + + /** + * Checks whether this connection is marked to break the pipeline. + * + * @return True, if this connection is marked to break the pipeline, false otherwise. + */ + public boolean isBreakingPipeline() { + return this.breakPipeline; + } + + /** + * Gets the interesting properties object for this pact connection. + * If the interesting properties for this connections have not yet been set, + * this method returns null. + * + * @return The collection of all interesting properties, or null, if they have not yet been set. + */ + public InterestingProperties getInterestingProperties() { + return this.interestingProps; + } + + /** + * Sets the interesting properties for this pact connection. + * + * @param props The interesting properties. + */ + public void setInterestingProperties(InterestingProperties props) { + if (this.interestingProps == null) { + this.interestingProps = props; + } else { + throw new IllegalStateException("Interesting Properties have already been set."); + } + } + + public void clearInterestingProperties() { + this.interestingProps = null; + } + + public void initMaxDepth() { + + if (this.maxDepth == -1) { + this.maxDepth = this.source.getMaxDepth() + 1; + } else { + throw new IllegalStateException("Maximum path depth has already been initialized."); + } + } + + public int getMaxDepth() { + if (this.maxDepth != -1) { + return this.maxDepth; + } else { + throw new IllegalStateException("Maximum path depth has not been initialized."); + } + } + + // -------------------------------------------------------------------------------------------- + // Estimates + // -------------------------------------------------------------------------------------------- + + @Override + public long getEstimatedOutputSize() { + return this.source.getEstimatedOutputSize(); + } + + @Override + public long getEstimatedNumRecords() { + return this.source.getEstimatedNumRecords(); + } + + @Override + public float getEstimatedAvgWidthPerOutputRecord() { + return this.source.getEstimatedAvgWidthPerOutputRecord(); + } + + // -------------------------------------------------------------------------------------------- + + + public TempMode getMaterializationMode() { + return this.materializationMode; + } + + public void setMaterializationMode(TempMode materializationMode) { + this.materializationMode = materializationMode; + } + + public boolean isOnDynamicPath() { + return this.source.isOnDynamicPath(); + } + + public int getCostWeight() { + return this.source.getCostWeight(); + } + + // -------------------------------------------------------------------------------------------- + + public String toString() { + StringBuilder buf = new StringBuilder(50); + buf.append("Connection: "); + + if (this.source == null) { + buf.append("null"); + } else { + buf.append(this.source.getOperator().getName()); + buf.append('(').append(this.source.getName()).append(')'); + } + + buf.append(" -> "); + + if (this.shipStrategy != null) { + buf.append('['); + buf.append(this.shipStrategy.name()); + buf.append(']').append(' '); + } + + if (this.target == null) { + buf.append("null"); + } else { + buf.append(this.target.getOperator().getName()); + buf.append('(').append(this.target.getName()).append(')'); + } + + return buf.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java index 9e4f457..dbe04f4 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java @@ -46,7 +46,7 @@ import org.apache.flink.util.Visitor; */ public class DataSinkNode extends OptimizerNode { - protected PactConnection input; // The input edge + protected DagConnection input; // The input edge /** * Creates a new DataSinkNode for the given sink operator. @@ -64,7 +64,7 @@ public class DataSinkNode extends OptimizerNode { * * @return The input connection. */ - public PactConnection getInputConnection() { + public DagConnection getInputConnection() { return this.input; } @@ -87,8 +87,8 @@ public class DataSinkNode extends OptimizerNode { * @return The node's underlying operator. */ @Override - public GenericDataSinkBase<?> getPactContract() { - return (GenericDataSinkBase<?>) super.getPactContract(); + public GenericDataSinkBase<?> getOperator() { + return (GenericDataSinkBase<?>) super.getOperator(); } @Override @@ -97,7 +97,7 @@ public class DataSinkNode extends OptimizerNode { } @Override - public List<PactConnection> getIncomingConnections() { + public List<DagConnection> getIncomingConnections() { return Collections.singletonList(this.input); } @@ -107,19 +107,19 @@ public class DataSinkNode extends OptimizerNode { * @return An empty list. */ @Override - public List<PactConnection> getOutgoingConnections() { + public List<DagConnection> getOutgoingConnections() { return Collections.emptyList(); } @Override public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode) { - Operator<?> children = getPactContract().getInput(); + Operator<?> children = getOperator().getInput(); final OptimizerNode pred; - final PactConnection conn; + final DagConnection conn; pred = contractToNode.get(children); - conn = new PactConnection(pred, this, defaultExchangeMode); + conn = new DagConnection(pred, this, defaultExchangeMode); // create the connection and add it this.input = conn; @@ -141,8 +141,8 @@ public class DataSinkNode extends OptimizerNode { final InterestingProperties iProps = new InterestingProperties(); { - final Ordering partitioning = getPactContract().getPartitionOrdering(); - final DataDistribution dataDist = getPactContract().getDataDistribution(); + final Ordering partitioning = getOperator().getPartitionOrdering(); + final DataDistribution dataDist = getOperator().getDataDistribution(); final RequestedGlobalProperties partitioningProps = new RequestedGlobalProperties(); if (partitioning != null) { if(dataDist != null) { @@ -156,7 +156,7 @@ public class DataSinkNode extends OptimizerNode { } { - final Ordering localOrder = getPactContract().getLocalOrder(); + final Ordering localOrder = getOperator().getLocalOrder(); final RequestedLocalProperties orderProps = new RequestedLocalProperties(); if (localOrder != null) { orderProps.setOrdering(localOrder); @@ -184,7 +184,7 @@ public class DataSinkNode extends OptimizerNode { } @Override - protected List<UnclosedBranchDescriptor> getBranchesForParent(PactConnection parent) { + protected List<UnclosedBranchDescriptor> getBranchesForParent(DagConnection parent) { // return our own stack of open branches, because nothing is added return this.openBranches; } @@ -204,8 +204,8 @@ public class DataSinkNode extends OptimizerNode { List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator); List<PlanNode> outputPlans = new ArrayList<PlanNode>(); - final int dop = getDegreeOfParallelism(); - final int inDop = getPredecessorNode().getDegreeOfParallelism(); + final int dop = getParallelism(); + final int inDop = getPredecessorNode().getParallelism(); final ExecutionMode executionMode = this.input.getDataExchangeMode(); final boolean dopChange = dop != inDop; @@ -224,7 +224,7 @@ public class DataSinkNode extends OptimizerNode { // no need to check whether the created properties meet what we need in case // of ordering or global ordering, because the only interesting properties we have // are what we require - outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getPactContract().getName()+")" ,c)); + outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getOperator().getName()+")" ,c)); } } }