http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java new file mode 100644 index 0000000..cbd58ca --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.base.JoinOperatorBase; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.operators.AbstractJoinDescriptor; +import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties; +import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties; +import org.apache.flink.optimizer.operators.OperatorDescriptorDual; +import org.apache.flink.optimizer.operators.SortMergeJoinDescriptor; +import org.apache.flink.configuration.Configuration; + +/** + * The Optimizer representation of a join operator. + */ +public class JoinNode extends TwoInputNode { + + private List<OperatorDescriptorDual> dataProperties; + + /** + * Creates a new JoinNode for the given join operator. + * + * @param joinOperatorBase The join operator object. + */ + public JoinNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) { + super(joinOperatorBase); + + this.dataProperties = getDataProperties(joinOperatorBase, + joinOperatorBase.getJoinHint(), joinOperatorBase.getCustomPartitioner()); + } + + // ------------------------------------------------------------------------ + + /** + * Gets the contract object for this match node. + * + * @return The contract. + */ + @Override + public JoinOperatorBase<?, ?, ?, ?> getOperator() { + return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator(); + } + + @Override + public String getName() { + return "Join"; + } + + @Override + protected List<OperatorDescriptorDual> getPossibleProperties() { + return this.dataProperties; + } + + public void makeJoinWithSolutionSet(int solutionsetInputIndex) { + OperatorDescriptorDual op; + if (solutionsetInputIndex == 0) { + op = new HashJoinBuildFirstProperties(this.keys1, this.keys2); + } else if (solutionsetInputIndex == 1) { + op = new HashJoinBuildSecondProperties(this.keys1, this.keys2); + } else { + throw new IllegalArgumentException(); + } + + this.dataProperties = Collections.singletonList(op); + } + + /** + * The default estimates build on the principle of inclusion: The smaller input key domain is included in the larger + * input key domain. We also assume that every key from the larger input has one join partner in the smaller input. + * The result cardinality is hence the larger one. + */ + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + long card1 = getFirstPredecessorNode().getEstimatedNumRecords(); + long card2 = getSecondPredecessorNode().getEstimatedNumRecords(); + this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : Math.max(card1, card2); + + if (this.estimatedNumRecords >= 0) { + float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord(); + float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord(); + float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2; + + if (width > 0) { + this.estimatedOutputSize = (long) (width * this.estimatedNumRecords); + } + } + } + + private List<OperatorDescriptorDual> getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint, + Partitioner<?> customPartitioner) + { + // see if an internal hint dictates the strategy to use + Configuration conf = joinOperatorBase.getParameters(); + String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null); + + if (localStrategy != null) { + final AbstractJoinDescriptor fixedDriverStrat; + if (Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) || + Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) || + Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) || + Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) ) + { + fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2); + } + else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) { + fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2); + } + else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) { + fixedDriverStrat = new HashJoinBuildSecondProperties(this.keys1, this.keys2); + } + else { + throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy); + } + + if (customPartitioner != null) { + fixedDriverStrat.setCustomPartitioner(customPartitioner); + } + + ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>(); + list.add(fixedDriverStrat); + return list; + } + else { + ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>(); + + joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint; + + switch (joinHint) { + case BROADCAST_HASH_FIRST: + list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false)); + break; + case BROADCAST_HASH_SECOND: + list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false)); + break; + case REPARTITION_HASH_FIRST: + list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true)); + break; + case REPARTITION_HASH_SECOND: + list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true)); + break; + case REPARTITION_SORT_MERGE: + list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2, false, false, true)); + break; + case OPTIMIZER_CHOOSES: + list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2)); + list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2)); + list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2)); + break; + default: + throw new CompilerException("Unrecognized join hint: " + joinHint); + } + + if (customPartitioner != null) { + for (OperatorDescriptorDual descr : list) { + ((AbstractJoinDescriptor) descr).setCustomPartitioner(customPartitioner); + } + } + + return list; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java new file mode 100644 index 0000000..35def59 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.SingleInputOperator; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.operators.MapDescriptor; +import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; + +/** + * The optimizer's internal representation of a <i>Map</i> operator node. + */ +public class MapNode extends SingleInputNode { + + private final List<OperatorDescriptorSingle> possibleProperties; + + /** + * Creates a new MapNode for the given operator. + * + * @param operator The map operator. + */ + public MapNode(SingleInputOperator<?, ?, ?> operator) { + super(operator); + + this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapDescriptor()); + } + + @Override + public String getName() { + return "Map"; + } + + @Override + protected List<OperatorDescriptorSingle> getPossibleProperties() { + return this.possibleProperties; + } + + /** + * Computes the estimates for the Map operator. + * We assume that by default, Map takes one value and transforms it into another value. + * The cardinality consequently stays the same. + */ + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java new file mode 100644 index 0000000..b287c33 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.dag; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.SingleInputOperator; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.operators.MapPartitionDescriptor; +import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; + +/** + * The optimizer's internal representation of a <i>MapPartition</i> operator node. + */ +public class MapPartitionNode extends SingleInputNode { + + private final List<OperatorDescriptorSingle> possibleProperties; + + /** + * Creates a new MapNode for the given contract. + * + * @param operator The map partition contract object. + */ + public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) { + super(operator); + + this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor()); + } + + @Override + public String getName() { + return "MapPartition"; + } + + @Override + protected List<OperatorDescriptorSingle> getPossibleProperties() { + return this.possibleProperties; + } + + /** + * Computes the estimates for the MapPartition operator. + * We assume that by default, Map takes one value and transforms it into another value. + * The cardinality consequently stays the same. + */ + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + // we really cannot make any estimates here + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java new file mode 100644 index 0000000..de3cd22 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.base.JoinOperatorBase; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties; +import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties; +import org.apache.flink.optimizer.operators.OperatorDescriptorDual; +import org.apache.flink.optimizer.operators.SortMergeJoinDescriptor; +import org.apache.flink.configuration.Configuration; + +/** + * The Optimizer representation of a join operator. + */ +public class MatchNode extends TwoInputNode { + + private List<OperatorDescriptorDual> dataProperties; + + /** + * Creates a new MatchNode for the given join operator. + * + * @param joinOperatorBase The join operator object. + */ + public MatchNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) { + super(joinOperatorBase); + this.dataProperties = getDataProperties(joinOperatorBase, joinOperatorBase.getJoinHint()); + } + + // ------------------------------------------------------------------------ + + /** + * Gets the contract object for this match node. + * + * @return The contract. + */ + @Override + public JoinOperatorBase<?, ?, ?, ?> getOperator() { + return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator(); + } + + @Override + public String getName() { + return "Join"; + } + + @Override + protected List<OperatorDescriptorDual> getPossibleProperties() { + return this.dataProperties; + } + + public void makeJoinWithSolutionSet(int solutionsetInputIndex) { + OperatorDescriptorDual op; + if (solutionsetInputIndex == 0) { + op = new HashJoinBuildFirstProperties(this.keys1, this.keys2); + } else if (solutionsetInputIndex == 1) { + op = new HashJoinBuildSecondProperties(this.keys1, this.keys2); + } else { + throw new IllegalArgumentException(); + } + + this.dataProperties = Collections.singletonList(op); + } + + /** + * The default estimates build on the principle of inclusion: The smaller input key domain is included in the larger + * input key domain. We also assume that every key from the larger input has one join partner in the smaller input. + * The result cardinality is hence the larger one. + */ + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + long card1 = getFirstPredecessorNode().getEstimatedNumRecords(); + long card2 = getSecondPredecessorNode().getEstimatedNumRecords(); + this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : Math.max(card1, card2); + + if (this.estimatedNumRecords >= 0) { + float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord(); + float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord(); + float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2; + + if (width > 0) { + this.estimatedOutputSize = (long) (width * this.estimatedNumRecords); + } + } + } + + private List<OperatorDescriptorDual> getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint) { + // see if an internal hint dictates the strategy to use + Configuration conf = joinOperatorBase.getParameters(); + String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null); + + if (localStrategy != null) { + final OperatorDescriptorDual fixedDriverStrat; + if (Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) || + Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) || + Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) || + Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) ) + { + fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2); + } else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) { + fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2); + } else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) { + fixedDriverStrat = new HashJoinBuildSecondProperties(this.keys1, this.keys2); + } else { + throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy); + } + ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>(); + list.add(fixedDriverStrat); + return list; + } + else { + ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>(); + + joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint; + + switch (joinHint) { + case BROADCAST_HASH_FIRST: + list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false)); + break; + case BROADCAST_HASH_SECOND: + list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false)); + break; + case REPARTITION_HASH_FIRST: + list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true)); + break; + case REPARTITION_HASH_SECOND: + list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true)); + break; + case REPARTITION_SORT_MERGE: + list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2, false, false, true)); + break; + case OPTIMIZER_CHOOSES: + list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2)); + list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2)); + list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2)); + break; + default: + throw new CompilerException("Unrecognized join hint: " + joinHint); + } + + return list; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java new file mode 100644 index 0000000..76467cf --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.dag; + +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.operators.NoOpDescriptor; + +/** + * The optimizer's internal representation of a <i>No Operation</i> node. + */ +public class NoOpNode extends UnaryOperatorNode { + + public NoOpNode() { + super("No Op", new FieldSet(), new NoOpDescriptor()); + } + + public NoOpNode(String name) { + super(name, new FieldSet(), new NoOpDescriptor()); + } + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); + this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java new file mode 100644 index 0000000..0cad34e --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java @@ -0,0 +1,1172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.operators.AbstractUdfOperator; +import org.apache.flink.api.common.operators.CompilerHints; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.costs.CostEstimator; +import org.apache.flink.optimizer.dataproperties.InterestingProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.optimizer.plandump.DumpableConnection; +import org.apache.flink.optimizer.plandump.DumpableNode; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.util.Visitable; +import org.apache.flink.util.Visitor; + +/** + * The OptimizerNode is the base class of all nodes in the optimizer DAG. The optimizer DAG is the + * optimizer's representation of a program, created before the actual optimization (which creates different + * candidate plans and computes their cost). + * <p>> + * Nodes in the DAG correspond (almost) one-to-one to the operators in a program. The optimizer DAG is constructed + * to hold the additional information that the optimizer needs: + * <ul> + * <li>Estimates of the data size processed by each operator</li> + * <li>Helper structures to track where the data flow "splits" and "joins", to support flows that are + * DAGs but not trees.</li> + * <li>Tags and weights to differentiate between loop-variant and -invariant parts of an iteration</li> + * <li>Interesting properties to be used during the enumeration of candidate plans</li> + * </ul> + */ +public abstract class OptimizerNode implements Visitable<OptimizerNode>, EstimateProvider, DumpableNode<OptimizerNode> { + + public static final int MAX_DYNAMIC_PATH_COST_WEIGHT = 100; + + // -------------------------------------------------------------------------------------------- + // Members + // -------------------------------------------------------------------------------------------- + + private final Operator<?> operator; // The operator (Reduce / Join / DataSource / ...) + + private List<String> broadcastConnectionNames = new ArrayList<String>(); // the broadcast inputs names of this node + + private List<DagConnection> broadcastConnections = new ArrayList<DagConnection>(); // the broadcast inputs of this node + + private List<DagConnection> outgoingConnections; // The links to succeeding nodes + + private InterestingProperties intProps; // the interesting properties of this node + + // --------------------------------- Branch Handling ------------------------------------------ + + protected List<UnclosedBranchDescriptor> openBranches; // stack of branches in the sub-graph that are not joined + + protected Set<OptimizerNode> closedBranchingNodes; // stack of branching nodes which have already been closed + + protected List<OptimizerNode> hereJoinedBranches; // the branching nodes (node with multiple outputs) + // that are partially joined (through multiple inputs or broadcast vars) + + // ---------------------------- Estimates and Annotations ------------------------------------- + + protected long estimatedOutputSize = -1; // the estimated size of the output (bytes) + + protected long estimatedNumRecords = -1; // the estimated number of key/value pairs in the output + + protected Set<FieldSet> uniqueFields; // set of attributes that will always be unique after this node + + // --------------------------------- General Parameters --------------------------------------- + + private int parallelism = -1; // the number of parallel instances of this node + + private long minimalMemoryPerSubTask = -1; + + protected int id = -1; // the id for this node. + + protected int costWeight = 1; // factor to weight the costs for dynamic paths + + protected boolean onDynamicPath; + + protected List<PlanNode> cachedPlans; // cache candidates, because the may be accessed repeatedly + + // ------------------------------------------------------------------------ + // Constructor / Setup + // ------------------------------------------------------------------------ + + /** + * Creates a new optimizer node that represents the given program operator. + * + * @param op The operator that the node represents. + */ + public OptimizerNode(Operator<?> op) { + this.operator = op; + readStubAnnotations(); + } + + protected OptimizerNode(OptimizerNode toCopy) { + this.operator = toCopy.operator; + this.intProps = toCopy.intProps; + + this.openBranches = toCopy.openBranches; + this.closedBranchingNodes = toCopy.closedBranchingNodes; + + this.estimatedOutputSize = toCopy.estimatedOutputSize; + this.estimatedNumRecords = toCopy.estimatedNumRecords; + + this.parallelism = toCopy.parallelism; + this.minimalMemoryPerSubTask = toCopy.minimalMemoryPerSubTask; + + this.id = toCopy.id; + this.costWeight = toCopy.costWeight; + this.onDynamicPath = toCopy.onDynamicPath; + } + + // ------------------------------------------------------------------------ + // Methods specific to unary- / binary- / special nodes + // ------------------------------------------------------------------------ + + /** + * Gets the name of this node, which is the name of the function/operator, or + * data source / data sink. + * + * @return The node name. + */ + public abstract String getName(); + + /** + * This function connects the predecessors to this operator. + * + * @param operatorToNode The map from program operators to optimizer nodes. + * @param defaultExchangeMode The data exchange mode to use, if the operator does not + * specify one. + */ + public abstract void setInput(Map<Operator<?>, OptimizerNode> operatorToNode, + ExecutionMode defaultExchangeMode); + + /** + * This function connects the operators that produce the broadcast inputs to this operator. + * + * @param operatorToNode The map from program operators to optimizer nodes. + * @param defaultExchangeMode The data exchange mode to use, if the operator does not + * specify one. + * + * @throws CompilerException + */ + public void setBroadcastInputs(Map<Operator<?>, OptimizerNode> operatorToNode, ExecutionMode defaultExchangeMode) { + // skip for Operators that don't support broadcast variables + if (!(getOperator() instanceof AbstractUdfOperator<?, ?>)) { + return; + } + + // get all broadcast inputs + AbstractUdfOperator<?, ?> operator = ((AbstractUdfOperator<?, ?>) getOperator()); + + // create connections and add them + for (Map.Entry<String, Operator<?>> input : operator.getBroadcastInputs().entrySet()) { + OptimizerNode predecessor = operatorToNode.get(input.getValue()); + DagConnection connection = new DagConnection(predecessor, this, + ShipStrategyType.BROADCAST, defaultExchangeMode); + addBroadcastConnection(input.getKey(), connection); + predecessor.addOutgoingConnection(connection); + } + } + + /** + * Gets all incoming connections of this node. + * This method needs to be overridden by subclasses to return the children. + * + * @return The list of incoming connections. + */ + public abstract List<DagConnection> getIncomingConnections(); + + /** + * Tells the node to compute the interesting properties for its inputs. The interesting properties + * for the node itself must have been computed before. + * The node must then see how many of interesting properties it preserves and add its own. + * + * @param estimator The {@code CostEstimator} instance to use for plan cost estimation. + */ + public abstract void computeInterestingPropertiesForInputs(CostEstimator estimator); + + /** + * This method causes the node to compute the description of open branches in its sub-plan. An open branch + * describes, that a (transitive) child node had multiple outputs, which have not all been re-joined in the + * sub-plan. This method needs to set the <code>openBranches</code> field to a stack of unclosed branches, the + * latest one top. A branch is considered closed, if some later node sees all of the branching node's outputs, + * no matter if there have been more branches to different paths in the meantime. + */ + public abstract void computeUnclosedBranchStack(); + + + protected List<UnclosedBranchDescriptor> computeUnclosedBranchStackForBroadcastInputs( + List<UnclosedBranchDescriptor> branchesSoFar) + { + // handle the data flow branching for the broadcast inputs + for (DagConnection broadcastInput : getBroadcastConnections()) { + OptimizerNode bcSource = broadcastInput.getSource(); + addClosedBranches(bcSource.closedBranchingNodes); + + List<UnclosedBranchDescriptor> bcBranches = bcSource.getBranchesForParent(broadcastInput); + + ArrayList<UnclosedBranchDescriptor> mergedBranches = new ArrayList<UnclosedBranchDescriptor>(); + mergeLists(branchesSoFar, bcBranches, mergedBranches, true); + branchesSoFar = mergedBranches.isEmpty() ? Collections.<UnclosedBranchDescriptor>emptyList() : mergedBranches; + } + + return branchesSoFar; + } + + /** + * Computes the plan alternatives for this node, an implicitly for all nodes that are children of + * this node. This method must determine for each alternative the global and local properties + * and the costs. This method may recursively call <code>getAlternatives()</code> on its children + * to get their plan alternatives, and build its own alternatives on top of those. + * + * @param estimator + * The cost estimator used to estimate the costs of each plan alternative. + * @return A list containing all plan alternatives. + */ + public abstract List<PlanNode> getAlternativePlans(CostEstimator estimator); + + /** + * This method implements the visit of a depth-first graph traversing visitor. Implementers must first + * call the <code>preVisit()</code> method, then hand the visitor to their children, and finally call + * the <code>postVisit()</code> method. + * + * @param visitor + * The graph traversing visitor. + * @see org.apache.flink.util.Visitable#accept(org.apache.flink.util.Visitor) + */ + @Override + public abstract void accept(Visitor<OptimizerNode> visitor); + + public abstract SemanticProperties getSemanticProperties(); + + // ------------------------------------------------------------------------ + // Getters / Setters + // ------------------------------------------------------------------------ + + @Override + public Iterable<OptimizerNode> getPredecessors() { + List<OptimizerNode> allPredecessors = new ArrayList<OptimizerNode>(); + + for (DagConnection dagConnection : getIncomingConnections()) { + allPredecessors.add(dagConnection.getSource()); + } + + for (DagConnection conn : getBroadcastConnections()) { + allPredecessors.add(conn.getSource()); + } + + return allPredecessors; + } + + /** + * Gets the ID of this node. If the id has not yet been set, this method returns -1; + * + * @return This node's id, or -1, if not yet set. + */ + public int getId() { + return this.id; + } + + /** + * Sets the ID of this node. + * + * @param id + * The id for this node. + */ + public void initId(int id) { + if (id <= 0) { + throw new IllegalArgumentException(); + } + + if (this.id == -1) { + this.id = id; + } else { + throw new IllegalStateException("Id has already been initialized."); + } + } + + /** + * Adds the broadcast connection identified by the given {@code name} to this node. + * + * @param broadcastConnection The connection to add. + */ + public void addBroadcastConnection(String name, DagConnection broadcastConnection) { + this.broadcastConnectionNames.add(name); + this.broadcastConnections.add(broadcastConnection); + } + + /** + * Return the list of names associated with broadcast inputs for this node. + */ + public List<String> getBroadcastConnectionNames() { + return this.broadcastConnectionNames; + } + + /** + * Return the list of inputs associated with broadcast variables for this node. + */ + public List<DagConnection> getBroadcastConnections() { + return this.broadcastConnections; + } + + /** + * Adds a new outgoing connection to this node. + * + * @param connection + * The connection to add. + */ + public void addOutgoingConnection(DagConnection connection) { + if (this.outgoingConnections == null) { + this.outgoingConnections = new ArrayList<DagConnection>(); + } else { + if (this.outgoingConnections.size() == 64) { + throw new CompilerException("Cannot currently handle nodes with more than 64 outputs."); + } + } + + this.outgoingConnections.add(connection); + } + + /** + * The list of outgoing connections from this node to succeeding tasks. + * + * @return The list of outgoing connections. + */ + public List<DagConnection> getOutgoingConnections() { + return this.outgoingConnections; + } + + /** + * Gets the operator represented by this optimizer node. + * + * @return This node's operator. + */ + public Operator<?> getOperator() { + return this.operator; + } + + /** + * Gets the parallelism for the operator represented by this optimizer node. + * The parallelism denotes how many parallel instances of the operator on will be + * spawned during the execution. If this value is <code>-1</code>, then the system will take + * the default number of parallel instances. + * + * @return The parallelism of the operator. + */ + public int getParallelism() { + return this.parallelism; + } + + /** + * Sets the parallelism for this optimizer node. + * The parallelism denotes how many parallel instances of the operator will be + * spawned during the execution. If this value is set to <code>-1</code>, then the system will take + * the default number of parallel instances. + * + * @param parallelism The parallelism to set. + * @throws IllegalArgumentException If the parallelism is smaller than one and not -1. + */ + public void setDegreeOfParallelism(int parallelism) { + if (parallelism < 1 && parallelism != -1) { + throw new IllegalArgumentException("Degree of parallelism of " + parallelism + " is invalid."); + } + this.parallelism = parallelism; + } + + /** + * Gets the amount of memory that all subtasks of this task have jointly available. + * + * @return The total amount of memory across all subtasks. + */ + public long getMinimalMemoryAcrossAllSubTasks() { + return this.minimalMemoryPerSubTask == -1 ? -1 : this.minimalMemoryPerSubTask * this.parallelism; + } + + public boolean isOnDynamicPath() { + return this.onDynamicPath; + } + + public void identifyDynamicPath(int costWeight) { + boolean anyDynamic = false; + boolean allDynamic = true; + + for (DagConnection conn : getIncomingConnections()) { + boolean dynamicIn = conn.isOnDynamicPath(); + anyDynamic |= dynamicIn; + allDynamic &= dynamicIn; + } + + for (DagConnection conn : getBroadcastConnections()) { + boolean dynamicIn = conn.isOnDynamicPath(); + anyDynamic |= dynamicIn; + allDynamic &= dynamicIn; + } + + if (anyDynamic) { + this.onDynamicPath = true; + this.costWeight = costWeight; + if (!allDynamic) { + // this node joins static and dynamic path. + // mark the connections where the source is not dynamic as cached + for (DagConnection conn : getIncomingConnections()) { + if (!conn.getSource().isOnDynamicPath()) { + conn.setMaterializationMode(conn.getMaterializationMode().makeCached()); + } + } + + // broadcast variables are always cached, because they stay unchanged available in the + // runtime context of the functions + } + } + } + + public int getCostWeight() { + return this.costWeight; + } + + public int getMaxDepth() { + int maxDepth = 0; + for (DagConnection conn : getIncomingConnections()) { + maxDepth = Math.max(maxDepth, conn.getMaxDepth()); + } + for (DagConnection conn : getBroadcastConnections()) { + maxDepth = Math.max(maxDepth, conn.getMaxDepth()); + } + + return maxDepth; + } + + /** + * Gets the properties that are interesting for this node to produce. + * + * @return The interesting properties for this node, or null, if not yet computed. + */ + public InterestingProperties getInterestingProperties() { + return this.intProps; + } + + @Override + public long getEstimatedOutputSize() { + return this.estimatedOutputSize; + } + + @Override + public long getEstimatedNumRecords() { + return this.estimatedNumRecords; + } + + public void setEstimatedOutputSize(long estimatedOutputSize) { + this.estimatedOutputSize = estimatedOutputSize; + } + + public void setEstimatedNumRecords(long estimatedNumRecords) { + this.estimatedNumRecords = estimatedNumRecords; + } + + @Override + public float getEstimatedAvgWidthPerOutputRecord() { + if (this.estimatedOutputSize > 0 && this.estimatedNumRecords > 0) { + return ((float) this.estimatedOutputSize) / this.estimatedNumRecords; + } else { + return -1.0f; + } + } + + /** + * Checks whether this node has branching output. A node's output is branched, if it has more + * than one output connection. + * + * @return True, if the node's output branches. False otherwise. + */ + public boolean isBranching() { + return getOutgoingConnections() != null && getOutgoingConnections().size() > 1; + } + + public void markAllOutgoingConnectionsAsPipelineBreaking() { + if (this.outgoingConnections == null) { + throw new IllegalStateException("The outgoing connections have not yet been initialized."); + } + for (DagConnection conn : getOutgoingConnections()) { + conn.markBreaksPipeline(); + } + } + + // ------------------------------------------------------------------------ + // Miscellaneous + // ------------------------------------------------------------------------ + + /** + * Checks, if all outgoing connections have their interesting properties set from their target nodes. + * + * @return True, if on all outgoing connections, the interesting properties are set. False otherwise. + */ + public boolean haveAllOutputConnectionInterestingProperties() { + for (DagConnection conn : getOutgoingConnections()) { + if (conn.getInterestingProperties() == null) { + return false; + } + } + return true; + } + + /** + * Computes all the interesting properties that are relevant to this node. The interesting + * properties are a union of the interesting properties on each outgoing connection. + * However, if two interesting properties on the outgoing connections overlap, + * the interesting properties will occur only once in this set. For that, this + * method deduplicates and merges the interesting properties. + * This method returns copies of the original interesting properties objects and + * leaves the original objects, contained by the connections, unchanged. + */ + public void computeUnionOfInterestingPropertiesFromSuccessors() { + List<DagConnection> conns = getOutgoingConnections(); + if (conns.size() == 0) { + // no incoming, we have none ourselves + this.intProps = new InterestingProperties(); + } else { + this.intProps = conns.get(0).getInterestingProperties().clone(); + for (int i = 1; i < conns.size(); i++) { + this.intProps.addInterestingProperties(conns.get(i).getInterestingProperties()); + } + } + this.intProps.dropTrivials(); + } + + public void clearInterestingProperties() { + this.intProps = null; + for (DagConnection conn : getIncomingConnections()) { + conn.clearInterestingProperties(); + } + for (DagConnection conn : getBroadcastConnections()) { + conn.clearInterestingProperties(); + } + } + + /** + * Causes this node to compute its output estimates (such as number of rows, size in bytes) + * based on the inputs and the compiler hints. The compiler hints are instantiated with conservative + * default values which are used if no other values are provided. Nodes may access the statistics to + * determine relevant information. + * + * @param statistics + * The statistics object which may be accessed to get statistical information. + * The parameter may be null, if no statistics are available. + */ + public void computeOutputEstimates(DataStatistics statistics) { + // sanity checking + for (DagConnection c : getIncomingConnections()) { + if (c.getSource() == null) { + throw new CompilerException("Bug: Estimate computation called before inputs have been set."); + } + } + + // let every operator do its computation + computeOperatorSpecificDefaultEstimates(statistics); + + if (this.estimatedOutputSize < 0) { + this.estimatedOutputSize = -1; + } + if (this.estimatedNumRecords < 0) { + this.estimatedNumRecords = -1; + } + + // overwrite default estimates with hints, if given + if (getOperator() == null || getOperator().getCompilerHints() == null) { + return ; + } + + CompilerHints hints = getOperator().getCompilerHints(); + if (hints.getOutputSize() >= 0) { + this.estimatedOutputSize = hints.getOutputSize(); + } + + if (hints.getOutputCardinality() >= 0) { + this.estimatedNumRecords = hints.getOutputCardinality(); + } + + if (hints.getFilterFactor() >= 0.0f) { + if (this.estimatedNumRecords >= 0) { + this.estimatedNumRecords = (long) (this.estimatedNumRecords * hints.getFilterFactor()); + + if (this.estimatedOutputSize >= 0) { + this.estimatedOutputSize = (long) (this.estimatedOutputSize * hints.getFilterFactor()); + } + } + else if (this instanceof SingleInputNode) { + OptimizerNode pred = ((SingleInputNode) this).getPredecessorNode(); + if (pred != null && pred.getEstimatedNumRecords() >= 0) { + this.estimatedNumRecords = (long) (pred.getEstimatedNumRecords() * hints.getFilterFactor()); + } + } + } + + // use the width to infer the cardinality (given size) and vice versa + if (hints.getAvgOutputRecordSize() >= 1) { + // the estimated number of rows based on size + if (this.estimatedNumRecords == -1 && this.estimatedOutputSize >= 0) { + this.estimatedNumRecords = (long) (this.estimatedOutputSize / hints.getAvgOutputRecordSize()); + } + else if (this.estimatedOutputSize == -1 && this.estimatedNumRecords >= 0) { + this.estimatedOutputSize = (long) (this.estimatedNumRecords * hints.getAvgOutputRecordSize()); + } + } + } + + protected abstract void computeOperatorSpecificDefaultEstimates(DataStatistics statistics); + + // ------------------------------------------------------------------------ + // Reading of stub annotations + // ------------------------------------------------------------------------ + + /** + * Reads all stub annotations, i.e. which fields remain constant, what cardinality bounds the + * functions have, which fields remain unique. + */ + protected void readStubAnnotations() { + readUniqueFieldsAnnotation(); + } + + protected void readUniqueFieldsAnnotation() { + if (this.operator.getCompilerHints() != null) { + Set<FieldSet> uniqueFieldSets = operator.getCompilerHints().getUniqueFields(); + if (uniqueFieldSets != null) { + if (this.uniqueFields == null) { + this.uniqueFields = new HashSet<FieldSet>(); + } + this.uniqueFields.addAll(uniqueFieldSets); + } + } + } + + // ------------------------------------------------------------------------ + // Access of stub annotations + // ------------------------------------------------------------------------ + + /** + * Gets the FieldSets which are unique in the output of the node. + */ + public Set<FieldSet> getUniqueFields() { + return this.uniqueFields == null ? Collections.<FieldSet>emptySet() : this.uniqueFields; + } + + // -------------------------------------------------------------------------------------------- + // Pruning + // -------------------------------------------------------------------------------------------- + + protected void prunePlanAlternatives(List<PlanNode> plans) { + if (plans.isEmpty()) { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Too restrictive plan hints."); + } + // shortcut for the simple case + if (plans.size() == 1) { + return; + } + + // we can only compare plan candidates that made equal choices + // at the branching points. for each choice at a branching point, + // we need to keep the cheapest (wrt. interesting properties). + // if we do not keep candidates for each branch choice, we might not + // find branch compatible candidates when joining the branches back. + + // for pruning, we are quasi AFTER the node, so in the presence of + // branches, we need form the per-branch-choice groups by the choice + // they made at the latest un-joined branching node. Note that this is + // different from the check for branch compatibility of candidates, as + // this happens on the input sub-plans and hence BEFORE the node (therefore + // it is relevant to find the latest (partially) joined branch point. + + if (this.openBranches == null || this.openBranches.isEmpty()) { + prunePlanAlternativesWithCommonBranching(plans); + } else { + // partition the candidates into groups that made the same sub-plan candidate + // choice at the latest unclosed branch point + + final OptimizerNode[] branchDeterminers = new OptimizerNode[this.openBranches.size()]; + + for (int i = 0; i < branchDeterminers.length; i++) { + branchDeterminers[i] = this.openBranches.get(this.openBranches.size() - 1 - i).getBranchingNode(); + } + + // this sorter sorts by the candidate choice at the branch point + Comparator<PlanNode> sorter = new Comparator<PlanNode>() { + + @Override + public int compare(PlanNode o1, PlanNode o2) { + for (OptimizerNode branchDeterminer : branchDeterminers) { + PlanNode n1 = o1.getCandidateAtBranchPoint(branchDeterminer); + PlanNode n2 = o2.getCandidateAtBranchPoint(branchDeterminer); + int hash1 = System.identityHashCode(n1); + int hash2 = System.identityHashCode(n2); + + if (hash1 != hash2) { + return hash1 - hash2; + } + } + return 0; + } + }; + Collections.sort(plans, sorter); + + List<PlanNode> result = new ArrayList<PlanNode>(); + List<PlanNode> turn = new ArrayList<PlanNode>(); + + final PlanNode[] determinerChoice = new PlanNode[branchDeterminers.length]; + + while (!plans.isEmpty()) { + // take one as the determiner + turn.clear(); + PlanNode determiner = plans.remove(plans.size() - 1); + turn.add(determiner); + + for (int i = 0; i < determinerChoice.length; i++) { + determinerChoice[i] = determiner.getCandidateAtBranchPoint(branchDeterminers[i]); + } + + // go backwards through the plans and find all that are equal + boolean stillEqual = true; + for (int k = plans.size() - 1; k >= 0 && stillEqual; k--) { + PlanNode toCheck = plans.get(k); + + for (int i = 0; i < branchDeterminers.length; i++) { + PlanNode checkerChoice = toCheck.getCandidateAtBranchPoint(branchDeterminers[i]); + + if (checkerChoice != determinerChoice[i]) { + // not the same anymore + stillEqual = false; + break; + } + } + + if (stillEqual) { + // the same + plans.remove(k); + turn.add(toCheck); + } + } + + // now that we have only plans with the same branch alternatives, prune! + if (turn.size() > 1) { + prunePlanAlternativesWithCommonBranching(turn); + } + result.addAll(turn); + } + + // after all turns are complete + plans.clear(); + plans.addAll(result); + } + } + + protected void prunePlanAlternativesWithCommonBranching(List<PlanNode> plans) { + // for each interesting property, which plans are cheapest + final RequestedGlobalProperties[] gps = this.intProps.getGlobalProperties().toArray( + new RequestedGlobalProperties[this.intProps.getGlobalProperties().size()]); + final RequestedLocalProperties[] lps = this.intProps.getLocalProperties().toArray( + new RequestedLocalProperties[this.intProps.getLocalProperties().size()]); + + final PlanNode[][] toKeep = new PlanNode[gps.length][]; + final PlanNode[] cheapestForGlobal = new PlanNode[gps.length]; + + + PlanNode cheapest = null; // the overall cheapest plan + + // go over all plans from the list + for (PlanNode candidate : plans) { + // check if that plan is the overall cheapest + if (cheapest == null || (cheapest.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0)) { + cheapest = candidate; + } + + // find the interesting global properties that this plan matches + for (int i = 0; i < gps.length; i++) { + if (gps[i].isMetBy(candidate.getGlobalProperties())) { + // the candidate meets the global property requirements. That means + // it has a chance that its local properties are re-used (they would be + // destroyed if global properties need to be established) + + if (cheapestForGlobal[i] == null || (cheapestForGlobal[i].getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0)) { + cheapestForGlobal[i] = candidate; + } + + final PlanNode[] localMatches; + if (toKeep[i] == null) { + localMatches = new PlanNode[lps.length]; + toKeep[i] = localMatches; + } else { + localMatches = toKeep[i]; + } + + for (int k = 0; k < lps.length; k++) { + if (lps[k].isMetBy(candidate.getLocalProperties())) { + final PlanNode previous = localMatches[k]; + if (previous == null || previous.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0) { + // this one is cheaper! + localMatches[k] = candidate; + } + } + } + } + } + } + + // all plans are set now + plans.clear(); + + // add the cheapest plan + if (cheapest != null) { + plans.add(cheapest); + cheapest.setPruningMarker(); // remember that that plan is in the set + } + + // add all others, which are optimal for some interesting properties + for (int i = 0; i < gps.length; i++) { + if (toKeep[i] != null) { + final PlanNode[] localMatches = toKeep[i]; + for (final PlanNode n : localMatches) { + if (n != null && !n.isPruneMarkerSet()) { + n.setPruningMarker(); + plans.add(n); + } + } + } + if (cheapestForGlobal[i] != null) { + final PlanNode n = cheapestForGlobal[i]; + if (!n.isPruneMarkerSet()) { + n.setPruningMarker(); + plans.add(n); + } + } + } + } + + + // -------------------------------------------------------------------------------------------- + // Handling of branches + // -------------------------------------------------------------------------------------------- + + public boolean hasUnclosedBranches() { + return this.openBranches != null && !this.openBranches.isEmpty(); + } + + public Set<OptimizerNode> getClosedBranchingNodes() { + return this.closedBranchingNodes; + } + + public List<UnclosedBranchDescriptor> getOpenBranches() { + return this.openBranches; + } + + + protected List<UnclosedBranchDescriptor> getBranchesForParent(DagConnection toParent) { + if (this.outgoingConnections.size() == 1) { + // return our own stack of open branches, because nothing is added + if (this.openBranches == null || this.openBranches.isEmpty()) { + return Collections.emptyList(); + } else { + return new ArrayList<UnclosedBranchDescriptor>(this.openBranches); + } + } + else if (this.outgoingConnections.size() > 1) { + // we branch add a branch info to the stack + List<UnclosedBranchDescriptor> branches = new ArrayList<UnclosedBranchDescriptor>(4); + if (this.openBranches != null) { + branches.addAll(this.openBranches); + } + + // find out, which output number the connection to the parent + int num; + for (num = 0; num < this.outgoingConnections.size(); num++) { + if (this.outgoingConnections.get(num) == toParent) { + break; + } + } + if (num >= this.outgoingConnections.size()) { + throw new CompilerException("Error in compiler: " + + "Parent to get branch info for is not contained in the outgoing connections."); + } + + // create the description and add it + long bitvector = 0x1L << num; + branches.add(new UnclosedBranchDescriptor(this, bitvector)); + return branches; + } + else { + throw new CompilerException( + "Error in compiler: Cannot get branch info for successor in a node with no successors."); + } + } + + + protected void removeClosedBranches(List<UnclosedBranchDescriptor> openList) { + if (openList == null || openList.isEmpty() || this.closedBranchingNodes == null || this.closedBranchingNodes.isEmpty()) { + return; + } + + Iterator<UnclosedBranchDescriptor> it = openList.iterator(); + while (it.hasNext()) { + if (this.closedBranchingNodes.contains(it.next().getBranchingNode())) { + //this branch was already closed --> remove it from the list + it.remove(); + } + } + } + + protected void addClosedBranches(Set<OptimizerNode> alreadyClosed) { + if (alreadyClosed == null || alreadyClosed.isEmpty()) { + return; + } + + if (this.closedBranchingNodes == null) { + this.closedBranchingNodes = new HashSet<OptimizerNode>(alreadyClosed); + } else { + this.closedBranchingNodes.addAll(alreadyClosed); + } + } + + protected void addClosedBranch(OptimizerNode alreadyClosed) { + if (this.closedBranchingNodes == null) { + this.closedBranchingNodes = new HashSet<OptimizerNode>(); + } + + this.closedBranchingNodes.add(alreadyClosed); + } + + /** + * Checks whether to candidate plans for the sub-plan of this node are comparable. The two + * alternative plans are comparable, if + * + * a) There is no branch in the sub-plan of this node + * b) Both candidates have the same candidate as the child at the last open branch. + * + * @param plan1 The root node of the first candidate plan. + * @param plan2 The root node of the second candidate plan. + * @return True if the nodes are branch compatible in the inputs. + */ + protected boolean areBranchCompatible(PlanNode plan1, PlanNode plan2) { + if (plan1 == null || plan2 == null) { + throw new NullPointerException(); + } + + // if there is no open branch, the children are always compatible. + // in most plans, that will be the dominant case + if (this.hereJoinedBranches == null || this.hereJoinedBranches.isEmpty()) { + return true; + } + + for (OptimizerNode joinedBrancher : hereJoinedBranches) { + final PlanNode branch1Cand = plan1.getCandidateAtBranchPoint(joinedBrancher); + final PlanNode branch2Cand = plan2.getCandidateAtBranchPoint(joinedBrancher); + + if (branch1Cand != null && branch2Cand != null && branch1Cand != branch2Cand) { + return false; + } + } + return true; + } + + /** + * The node IDs are assigned in graph-traversal order (pre-order), hence, each list is + * sorted by ID in ascending order and all consecutive lists start with IDs in ascending order. + * + * @param markJoinedBranchesAsPipelineBreaking True, if the + */ + protected final boolean mergeLists(List<UnclosedBranchDescriptor> child1open, + List<UnclosedBranchDescriptor> child2open, + List<UnclosedBranchDescriptor> result, + boolean markJoinedBranchesAsPipelineBreaking) { + + //remove branches which have already been closed + removeClosedBranches(child1open); + removeClosedBranches(child2open); + + result.clear(); + + // check how many open branches we have. the cases: + // 1) if both are null or empty, the result is null + // 2) if one side is null (or empty), the result is the other side. + // 3) both are set, then we need to merge. + if (child1open == null || child1open.isEmpty()) { + if(child2open != null && !child2open.isEmpty()) { + result.addAll(child2open); + } + return false; + } + + if (child2open == null || child2open.isEmpty()) { + result.addAll(child1open); + return false; + } + + int index1 = child1open.size() - 1; + int index2 = child2open.size() - 1; + + boolean didCloseABranch = false; + + // as both lists (child1open and child2open) are sorted in ascending ID order + // we can do a merge-join-like loop which preserved the order in the result list + // and eliminates duplicates + while (index1 >= 0 || index2 >= 0) { + int id1 = -1; + int id2 = index2 >= 0 ? child2open.get(index2).getBranchingNode().getId() : -1; + + while (index1 >= 0 && (id1 = child1open.get(index1).getBranchingNode().getId()) > id2) { + result.add(child1open.get(index1)); + index1--; + } + while (index2 >= 0 && (id2 = child2open.get(index2).getBranchingNode().getId()) > id1) { + result.add(child2open.get(index2)); + index2--; + } + + // match: they share a common branching child + if (id1 == id2) { + didCloseABranch = true; + + // if this is the latest common child, remember it + OptimizerNode currBanchingNode = child1open.get(index1).getBranchingNode(); + + long vector1 = child1open.get(index1).getJoinedPathsVector(); + long vector2 = child2open.get(index2).getJoinedPathsVector(); + + // check if this is the same descriptor, (meaning that it contains the same paths) + // if it is the same, add it only once, otherwise process the join of the paths + if (vector1 == vector2) { + result.add(child1open.get(index1)); + } + else { + // we merge (re-join) a branch + + // mark the branch as a point where we break the pipeline + if (markJoinedBranchesAsPipelineBreaking) { + currBanchingNode.markAllOutgoingConnectionsAsPipelineBreaking(); + } + + if (this.hereJoinedBranches == null) { + this.hereJoinedBranches = new ArrayList<OptimizerNode>(2); + } + this.hereJoinedBranches.add(currBanchingNode); + + // see, if this node closes the branch + long joinedInputs = vector1 | vector2; + + // this is 2^size - 1, which is all bits set at positions 0..size-1 + long allInputs = (0x1L << currBanchingNode.getOutgoingConnections().size()) - 1; + + if (joinedInputs == allInputs) { + // closed - we can remove it from the stack + addClosedBranch(currBanchingNode); + } else { + // not quite closed + result.add(new UnclosedBranchDescriptor(currBanchingNode, joinedInputs)); + } + } + + index1--; + index2--; + } + } + + // merged. now we need to reverse the list, because we added the elements in reverse order + Collections.reverse(result); + return didCloseABranch; + } + + @Override + public OptimizerNode getOptimizerNode() { + return this; + } + + @Override + public PlanNode getPlanNode() { + return null; + } + + @Override + public Iterable<DumpableConnection<OptimizerNode>> getDumpableInputs() { + List<DumpableConnection<OptimizerNode>> allInputs = new ArrayList<DumpableConnection<OptimizerNode>>(); + + allInputs.addAll(getIncomingConnections()); + allInputs.addAll(getBroadcastConnections()); + + return allInputs; + } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder(); + + bld.append(getName()); + bld.append(" (").append(getOperator().getName()).append(") "); + + int i = 1; + for (DagConnection conn : getIncomingConnections()) { + String shipStrategyName = conn.getShipStrategy() == null ? "null" : conn.getShipStrategy().name(); + bld.append('(').append(i++).append(":").append(shipStrategyName).append(')'); + } + + return bld.toString(); + } + + // -------------------------------------------------------------------------------------------- + + /** + * Description of an unclosed branch. An unclosed branch is when the data flow branched (one operator's + * result is consumed by multiple targets), but these different branches (targets) have not been joined + * together. + */ + public static final class UnclosedBranchDescriptor { + + protected OptimizerNode branchingNode; + + protected long joinedPathsVector; + + /** + * Creates a new branching descriptor. + * + * @param branchingNode The node where the branch occurred (teh node with multiple outputs). + * @param joinedPathsVector A bit vector describing which branches are tracked by this descriptor. + * The bit vector is one, where the branch is tracked, zero otherwise. + */ + protected UnclosedBranchDescriptor(OptimizerNode branchingNode, long joinedPathsVector) { + this.branchingNode = branchingNode; + this.joinedPathsVector = joinedPathsVector; + } + + public OptimizerNode getBranchingNode() { + return this.branchingNode; + } + + public long getJoinedPathsVector() { + return this.joinedPathsVector; + } + + @Override + public String toString() { + return "(" + this.branchingNode.getOperator() + ") [" + this.joinedPathsVector + "]"; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java new file mode 100644 index 0000000..5c811b0 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.dag; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.base.PartitionOperatorBase; +import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +/** + * The optimizer's internal representation of a <i>Partition</i> operator node. + */ +public class PartitionNode extends SingleInputNode { + + private final List<OperatorDescriptorSingle> possibleProperties; + + public PartitionNode(PartitionOperatorBase<?> operator) { + super(operator); + + OperatorDescriptorSingle descr = new PartitionDescriptor( + this.getOperator().getPartitionMethod(), this.keys, operator.getCustomPartitioner()); + this.possibleProperties = Collections.singletonList(descr); + } + + @Override + public PartitionOperatorBase<?> getOperator() { + return (PartitionOperatorBase<?>) super.getOperator(); + } + + @Override + public String getName() { + return "Partition"; + } + + @Override + protected List<OperatorDescriptorSingle> getPossibleProperties() { + return this.possibleProperties; + } + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + // partitioning does not change the number of records + this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); + this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize(); + } + + @Override + public SemanticProperties getSemanticProperties() { + return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); + } + + // -------------------------------------------------------------------------------------------- + + public static class PartitionDescriptor extends OperatorDescriptorSingle { + + private final PartitionMethod pMethod; + private final Partitioner<?> customPartitioner; + + public PartitionDescriptor(PartitionMethod pMethod, FieldSet pKeys, Partitioner<?> customPartitioner) { + super(pKeys); + + this.pMethod = pMethod; + this.customPartitioner = customPartitioner; + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.UNARY_NO_OP; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + return new SingleInputPlanNode(node, "Partition", in, DriverStrategy.UNARY_NO_OP); + } + + @Override + protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { + RequestedGlobalProperties rgps = new RequestedGlobalProperties(); + + switch (this.pMethod) { + case HASH: + rgps.setHashPartitioned(this.keys); + break; + case REBALANCE: + rgps.setForceRebalancing(); + break; + case CUSTOM: + rgps.setCustomPartitioned(this.keys, this.customPartitioner); + break; + case RANGE: + throw new UnsupportedOperationException("Not yet supported"); + default: + throw new IllegalArgumentException("Invalid partition method"); + } + + return Collections.singletonList(rgps); + } + + @Override + protected List<RequestedLocalProperties> createPossibleLocalProperties() { + // partitioning does not require any local property. + return Collections.singletonList(new RequestedLocalProperties()); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + // the partition node is a no-operation operation, such that all global properties are preserved. + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + // the partition node is a no-operation operation, such that all global properties are preserved. + return lProps; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java new file mode 100644 index 0000000..bbe4607 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import org.apache.flink.util.Visitor; + +final class PlanCacheCleaner implements Visitor<OptimizerNode> { + + static final PlanCacheCleaner INSTANCE = new PlanCacheCleaner(); + + @Override + public boolean preVisit(OptimizerNode visitable) { + if (visitable.cachedPlans != null && visitable.isOnDynamicPath()) { + visitable.cachedPlans = null; + return true; + } else { + return false; + } + } + + @Override + public void postVisit(OptimizerNode visitable) {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java new file mode 100644 index 0000000..1477038 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.dag; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.base.ReduceOperatorBase; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.operators.AllReduceProperties; +import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; +import org.apache.flink.optimizer.operators.ReduceProperties; + +/** + * The Optimizer representation of a <i>Reduce</i> operator. + */ +public class ReduceNode extends SingleInputNode { + + private final List<OperatorDescriptorSingle> possibleProperties; + + private ReduceNode preReduceUtilityNode; + + + public ReduceNode(ReduceOperatorBase<?, ?> operator) { + super(operator); + + if (this.keys == null) { + // case of a key-less reducer. force a parallelism of 1 + setDegreeOfParallelism(1); + } + + OperatorDescriptorSingle props = this.keys == null ? + new AllReduceProperties() : + new ReduceProperties(this.keys, operator.getCustomPartitioner()); + + this.possibleProperties = Collections.singletonList(props); + } + + public ReduceNode(ReduceNode reducerToCopyForCombiner) { + super(reducerToCopyForCombiner); + + this.possibleProperties = Collections.emptyList(); + } + + // ------------------------------------------------------------------------ + + @Override + public ReduceOperatorBase<?, ?> getOperator() { + return (ReduceOperatorBase<?, ?>) super.getOperator(); + } + + @Override + public String getName() { + return "Reduce"; + } + + @Override + protected List<OperatorDescriptorSingle> getPossibleProperties() { + return this.possibleProperties; + } + + // -------------------------------------------------------------------------------------------- + // Estimates + // -------------------------------------------------------------------------------------------- + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + // no real estimates possible for a reducer. + } + + public ReduceNode getCombinerUtilityNode() { + if (this.preReduceUtilityNode == null) { + this.preReduceUtilityNode = new ReduceNode(this); + + // we conservatively assume the combiner returns the same data size as it consumes + this.preReduceUtilityNode.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize(); + this.preReduceUtilityNode.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); + } + return this.preReduceUtilityNode; + } +}