http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java new file mode 100644 index 0000000..cc12bb8 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.dag; + +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SingleInputOperator; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.CostEstimator; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.InterestingProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.NamedChannel; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport; +import org.apache.flink.optimizer.util.NoOpUnaryUdfOp; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.DataExchangeMode; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.util.Visitor; + +import com.google.common.collect.Sets; + +/** + * A node in the optimizer's program representation for an operation with a single input. + * + * This class contains all the generic logic for handling branching flows, as well as to + * enumerate candidate execution plans. The subclasses for specific operators simply add logic + * for cost estimates and specify possible strategies for their execution. + */ +public abstract class SingleInputNode extends OptimizerNode { + + protected final FieldSet keys; // The set of key fields + + protected DagConnection inConn; // the input of the node + + // -------------------------------------------------------------------------------------------- + + /** + * Creates a new node with a single input for the optimizer plan. + * + * @param programOperator The PACT that the node represents. + */ + protected SingleInputNode(SingleInputOperator<?, ?, ?> programOperator) { + super(programOperator); + + int[] k = programOperator.getKeyColumns(0); + this.keys = k == null || k.length == 0 ? null : new FieldSet(k); + } + + protected SingleInputNode(FieldSet keys) { + super(NoOpUnaryUdfOp.INSTANCE); + this.keys = keys; + } + + protected SingleInputNode() { + super(NoOpUnaryUdfOp.INSTANCE); + this.keys = null; + } + + protected SingleInputNode(SingleInputNode toCopy) { + super(toCopy); + + this.keys = toCopy.keys; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public SingleInputOperator<?, ?, ?> getOperator() { + return (SingleInputOperator<?, ?, ?>) super.getOperator(); + } + + /** + * Gets the input of this operator. + * + * @return The input. + */ + public DagConnection getIncomingConnection() { + return this.inConn; + } + + /** + * Sets the connection through which this node receives its input. + * + * @param inConn The input connection to set. + */ + public void setIncomingConnection(DagConnection inConn) { + this.inConn = inConn; + } + + /** + * Gets the predecessor of this node. + * + * @return The predecessor of this node. + */ + public OptimizerNode getPredecessorNode() { + if (this.inConn != null) { + return this.inConn.getSource(); + } else { + return null; + } + } + + @Override + public List<DagConnection> getIncomingConnections() { + return Collections.singletonList(this.inConn); + } + + + @Override + public SemanticProperties getSemanticProperties() { + return getOperator().getSemanticProperties(); + } + + + @Override + public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode) + throws CompilerException + { + // see if an internal hint dictates the strategy to use + final Configuration conf = getOperator().getParameters(); + final String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null); + final ShipStrategyType preSet; + + if (shipStrategy != null) { + if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH)) { + preSet = ShipStrategyType.PARTITION_HASH; + } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE)) { + preSet = ShipStrategyType.PARTITION_RANGE; + } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_FORWARD)) { + preSet = ShipStrategyType.FORWARD; + } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) { + preSet = ShipStrategyType.PARTITION_RANDOM; + } else { + throw new CompilerException("Unrecognized ship strategy hint: " + shipStrategy); + } + } else { + preSet = null; + } + + // get the predecessor node + Operator<?> children = ((SingleInputOperator<?, ?, ?>) getOperator()).getInput(); + + OptimizerNode pred; + DagConnection conn; + if (children == null) { + throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input."); + } else { + pred = contractToNode.get(children); + conn = new DagConnection(pred, this, defaultExchangeMode); + if (preSet != null) { + conn.setShipStrategy(preSet); + } + } + + // create the connection and add it + setIncomingConnection(conn); + pred.addOutgoingConnection(conn); + } + + // -------------------------------------------------------------------------------------------- + // Properties and Optimization + // -------------------------------------------------------------------------------------------- + + protected abstract List<OperatorDescriptorSingle> getPossibleProperties(); + + @Override + public void computeInterestingPropertiesForInputs(CostEstimator estimator) { + // get what we inherit and what is preserved by our user code + final InterestingProperties props = getInterestingProperties().filterByCodeAnnotations(this, 0); + + // add all properties relevant to this node + for (OperatorDescriptorSingle dps : getPossibleProperties()) { + for (RequestedGlobalProperties gp : dps.getPossibleGlobalProperties()) { + + if (gp.getPartitioning().isPartitionedOnKey()) { + // make sure that among the same partitioning types, we do not push anything down that has fewer key fields + + for (RequestedGlobalProperties contained : props.getGlobalProperties()) { + if (contained.getPartitioning() == gp.getPartitioning() && gp.getPartitionedFields().isValidSubset(contained.getPartitionedFields())) { + props.getGlobalProperties().remove(contained); + break; + } + } + } + + props.addGlobalProperties(gp); + } + + for (RequestedLocalProperties lp : dps.getPossibleLocalProperties()) { + props.addLocalProperties(lp); + } + } + this.inConn.setInterestingProperties(props); + + for (DagConnection conn : getBroadcastConnections()) { + conn.setInterestingProperties(new InterestingProperties()); + } + } + + + @Override + public List<PlanNode> getAlternativePlans(CostEstimator estimator) { + // check if we have a cached version + if (this.cachedPlans != null) { + return this.cachedPlans; + } + + boolean childrenSkippedDueToReplicatedInput = false; + + // calculate alternative sub-plans for predecessor + final List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator); + final Set<RequestedGlobalProperties> intGlobal = this.inConn.getInterestingProperties().getGlobalProperties(); + + // calculate alternative sub-plans for broadcast inputs + final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>(); + List<DagConnection> broadcastConnections = getBroadcastConnections(); + List<String> broadcastConnectionNames = getBroadcastConnectionNames(); + + for (int i = 0; i < broadcastConnections.size(); i++ ) { + DagConnection broadcastConnection = broadcastConnections.get(i); + String broadcastConnectionName = broadcastConnectionNames.get(i); + List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator); + + // wrap the plan candidates in named channels + HashSet<NamedChannel> broadcastChannels = new HashSet<NamedChannel>(broadcastPlanCandidates.size()); + for (PlanNode plan: broadcastPlanCandidates) { + NamedChannel c = new NamedChannel(broadcastConnectionName, plan); + DataExchangeMode exMode = DataExchangeMode.select(broadcastConnection.getDataExchangeMode(), + ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline()); + c.setShipStrategy(ShipStrategyType.BROADCAST, exMode); + broadcastChannels.add(c); + } + broadcastPlanChannels.add(broadcastChannels); + } + + final RequestedGlobalProperties[] allValidGlobals; + { + Set<RequestedGlobalProperties> pairs = new HashSet<RequestedGlobalProperties>(); + for (OperatorDescriptorSingle ods : getPossibleProperties()) { + pairs.addAll(ods.getPossibleGlobalProperties()); + } + allValidGlobals = pairs.toArray(new RequestedGlobalProperties[pairs.size()]); + } + final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>(); + + final ExecutionMode executionMode = this.inConn.getDataExchangeMode(); + + final int dop = getParallelism(); + final int inDop = getPredecessorNode().getParallelism(); + final boolean dopChange = inDop != dop; + + final boolean breaksPipeline = this.inConn.isBreakingPipeline(); + + // create all candidates + for (PlanNode child : subPlans) { + + if (child.getGlobalProperties().isFullyReplicated()) { + // fully replicated input is always locally forwarded if DOP is not changed + if (dopChange) { + // can not continue with this child + childrenSkippedDueToReplicatedInput = true; + continue; + } else { + this.inConn.setShipStrategy(ShipStrategyType.FORWARD); + } + } + + if (this.inConn.getShipStrategy() == null) { + // pick the strategy ourselves + for (RequestedGlobalProperties igps: intGlobal) { + final Channel c = new Channel(child, this.inConn.getMaterializationMode()); + igps.parameterizeChannel(c, dopChange, executionMode, breaksPipeline); + + // if the DOP changed, make sure that we cancel out properties, unless the + // ship strategy preserves/establishes them even under changing DOPs + if (dopChange && !c.getShipStrategy().isNetworkStrategy()) { + c.getGlobalProperties().reset(); + } + + // check whether we meet any of the accepted properties + // we may remove this check, when we do a check to not inherit + // requested global properties that are incompatible with all possible + // requested properties + for (RequestedGlobalProperties rgps: allValidGlobals) { + if (rgps.isMetBy(c.getGlobalProperties())) { + c.setRequiredGlobalProps(rgps); + addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator); + break; + } + } + } + } else { + // hint fixed the strategy + final Channel c = new Channel(child, this.inConn.getMaterializationMode()); + final ShipStrategyType shipStrategy = this.inConn.getShipStrategy(); + final DataExchangeMode exMode = DataExchangeMode.select(executionMode, shipStrategy, breaksPipeline); + + if (this.keys != null) { + c.setShipStrategy(shipStrategy, this.keys.toFieldList(), exMode); + } else { + c.setShipStrategy(shipStrategy, exMode); + } + + if (dopChange) { + c.adjustGlobalPropertiesForFullParallelismChange(); + } + + // check whether we meet any of the accepted properties + for (RequestedGlobalProperties rgps: allValidGlobals) { + if (rgps.isMetBy(c.getGlobalProperties())) { + addLocalCandidates(c, broadcastPlanChannels, rgps, outputPlans, estimator); + break; + } + } + } + } + + if(outputPlans.isEmpty()) { + if(childrenSkippedDueToReplicatedInput) { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Invalid use of replicated input."); + } else { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Too restrictive plan hints."); + } + } + + // cost and prune the plans + for (PlanNode node : outputPlans) { + estimator.costOperator(node); + } + prunePlanAlternatives(outputPlans); + outputPlans.trimToSize(); + + this.cachedPlans = outputPlans; + return outputPlans; + } + + protected void addLocalCandidates(Channel template, List<Set<? extends NamedChannel>> broadcastPlanChannels, RequestedGlobalProperties rgps, + List<PlanNode> target, CostEstimator estimator) + { + for (RequestedLocalProperties ilp : this.inConn.getInterestingProperties().getLocalProperties()) { + final Channel in = template.clone(); + ilp.parameterizeChannel(in); + + // instantiate a candidate, if the instantiated local properties meet one possible local property set + outer: + for (OperatorDescriptorSingle dps: getPossibleProperties()) { + for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) { + if (ilps.isMetBy(in.getLocalProperties())) { + in.setRequiredLocalProps(ilps); + instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp); + break outer; + } + } + } + } + } + + protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels, + List<PlanNode> target, CostEstimator estimator, RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq) + { + final PlanNode inputSource = in.getSource(); + + for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) { + + boolean validCombination = true; + boolean requiresPipelinebreaker = false; + + // check whether the broadcast inputs use the same plan candidate at the branching point + for (int i = 0; i < broadcastChannelsCombination.size(); i++) { + NamedChannel nc = broadcastChannelsCombination.get(i); + PlanNode bcSource = nc.getSource(); + + // check branch compatibility against input + if (!areBranchCompatible(bcSource, inputSource)) { + validCombination = false; + break; + } + + // check branch compatibility against all other broadcast variables + for (int k = 0; k < i; k++) { + PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource(); + + if (!areBranchCompatible(bcSource, otherBcSource)) { + validCombination = false; + break; + } + } + + // check if there is a common predecessor and whether there is a dam on the way to all common predecessors + if (this.hereJoinedBranches != null) { + for (OptimizerNode brancher : this.hereJoinedBranches) { + PlanNode candAtBrancher = in.getSource().getCandidateAtBranchPoint(brancher); + + if (candAtBrancher == null) { + // closed branch between two broadcast variables + continue; + } + + SourceAndDamReport res = in.getSource().hasDamOnPathDownTo(candAtBrancher); + if (res == NOT_FOUND) { + throw new CompilerException("Bug: Tracing dams for deadlock detection is broken."); + } else if (res == FOUND_SOURCE) { + requiresPipelinebreaker = true; + break; + } else if (res == FOUND_SOURCE_AND_DAM) { + // good + } else { + throw new CompilerException(); + } + } + } + } + + if (!validCombination) { + continue; + } + + if (requiresPipelinebreaker) { + in.setTempMode(in.getTempMode().makePipelineBreaker()); + } + + final SingleInputPlanNode node = dps.instantiate(in, this); + node.setBroadcastInputs(broadcastChannelsCombination); + + // compute how the strategy affects the properties + GlobalProperties gProps = in.getGlobalProperties().clone(); + LocalProperties lProps = in.getLocalProperties().clone(); + gProps = dps.computeGlobalProperties(gProps); + lProps = dps.computeLocalProperties(lProps); + + SemanticProperties props = this.getSemanticProperties(); + // filter by the user code field copies + gProps = gProps.filterBySemanticProperties(props, 0); + lProps = lProps.filterBySemanticProperties(props, 0); + + // apply + node.initProperties(gProps, lProps); + node.updatePropertiesWithUniqueSets(getUniqueFields()); + target.add(node); + } + } + + // -------------------------------------------------------------------------------------------- + // Branch Handling + // -------------------------------------------------------------------------------------------- + + @Override + public void computeUnclosedBranchStack() { + if (this.openBranches != null) { + return; + } + + addClosedBranches(getPredecessorNode().closedBranchingNodes); + List<UnclosedBranchDescriptor> fromInput = getPredecessorNode().getBranchesForParent(this.inConn); + + // handle the data flow branching for the broadcast inputs + List<UnclosedBranchDescriptor> result = computeUnclosedBranchStackForBroadcastInputs(fromInput); + + this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result; + } + + // -------------------------------------------------------------------------------------------- + // Miscellaneous + // -------------------------------------------------------------------------------------------- + + @Override + public void accept(Visitor<OptimizerNode> visitor) { + if (visitor.preVisit(this)) { + if (getPredecessorNode() != null) { + getPredecessorNode().accept(visitor); + } else { + throw new CompilerException(); + } + for (DagConnection connection : getBroadcastConnections()) { + connection.getSource().accept(visitor); + } + visitor.postVisit(this); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java new file mode 100644 index 0000000..40725ba --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.typeinfo.NothingTypeInfo; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.operators.OperatorDescriptorDual; +import org.apache.flink.optimizer.operators.UtilSinkJoinOpDescriptor; +import org.apache.flink.optimizer.util.NoOpBinaryUdfOp; +import org.apache.flink.types.Nothing; + +/** + * This class represents a utility node that is not part of the actual plan. + * It is used for plans with multiple data sinks to transform it into a plan with + * a single root node. That way, the code that makes sure no costs are double-counted and that + * candidate selection works correctly with nodes that have multiple outputs is transparently reused. + */ +public class SinkJoiner extends TwoInputNode { + + public SinkJoiner(OptimizerNode input1, OptimizerNode input2) { + super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo())); + + DagConnection conn1 = new DagConnection(input1, this, null, ExecutionMode.PIPELINED); + DagConnection conn2 = new DagConnection(input2, this, null, ExecutionMode.PIPELINED); + + this.input1 = conn1; + this.input2 = conn2; + + setDegreeOfParallelism(1); + } + + @Override + public String getName() { + return "Internal Utility Node"; + } + + @Override + public List<DagConnection> getOutgoingConnections() { + return Collections.emptyList(); + } + + @Override + public void computeUnclosedBranchStack() { + if (this.openBranches != null) { + return; + } + + addClosedBranches(getFirstPredecessorNode().closedBranchingNodes); + addClosedBranches(getSecondPredecessorNode().closedBranchingNodes); + + List<UnclosedBranchDescriptor> pred1branches = getFirstPredecessorNode().openBranches; + List<UnclosedBranchDescriptor> pred2branches = getSecondPredecessorNode().openBranches; + + // if the predecessors do not have branches, then we have multiple sinks that do not originate from + // a common data flow. + if (pred1branches == null || pred1branches.isEmpty()) { + + this.openBranches = (pred2branches == null || pred2branches.isEmpty()) ? + Collections.<UnclosedBranchDescriptor>emptyList() : // both empty - disconnected flow + pred2branches; + } + else if (pred2branches == null || pred2branches.isEmpty()) { + this.openBranches = pred1branches; + } + else { + // copy the lists and merge + List<UnclosedBranchDescriptor> result1 = new ArrayList<UnclosedBranchDescriptor>(pred1branches); + List<UnclosedBranchDescriptor> result2 = new ArrayList<UnclosedBranchDescriptor>(pred2branches); + + ArrayList<UnclosedBranchDescriptor> result = new ArrayList<UnclosedBranchDescriptor>(); + mergeLists(result1, result2, result, false); + + this.openBranches = result.isEmpty() ? Collections.<UnclosedBranchDescriptor>emptyList() : result; + } + } + + @Override + protected List<OperatorDescriptorDual> getPossibleProperties() { + return Collections.<OperatorDescriptorDual>singletonList(new UtilSinkJoinOpDescriptor()); + } + + @Override + public void computeOutputEstimates(DataStatistics statistics) { + // nothing to be done here + } + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + // no estimates needed at this point + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java new file mode 100644 index 0000000..1292cf5 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.dag; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.optimizer.plan.SolutionSetPlanNode; + +/** + * The optimizer's internal representation of the solution set of a workset iteration. + */ +public class SolutionSetNode extends AbstractPartialSolutionNode { + + private final WorksetIterationNode iterationNode; + + + public SolutionSetNode(SolutionSetPlaceHolder<?> psph, WorksetIterationNode iterationNode) { + super(psph); + this.iterationNode = iterationNode; + } + + // -------------------------------------------------------------------------------------------- + + public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) { + this.cachedPlans = Collections.<PlanNode>singletonList(new SolutionSetPlanNode(this, "SolutionSet("+this.getOperator().getName()+")", gProps, lProps, initialInput)); + } + + public SolutionSetPlanNode getCurrentSolutionSetPlanNode() { + if (this.cachedPlans != null) { + return (SolutionSetPlanNode) this.cachedPlans.get(0); + } else { + throw new IllegalStateException(); + } + } + + public WorksetIterationNode getIterationNode() { + return this.iterationNode; + } + + @Override + public void computeOutputEstimates(DataStatistics statistics) { + copyEstimates(this.iterationNode.getInitialSolutionSetPredecessorNode()); + } + + // -------------------------------------------------------------------------------------------- + + /** + * Gets the contract object for this data source node. + * + * @return The contract. + */ + @Override + public SolutionSetPlaceHolder<?> getOperator() { + return (SolutionSetPlaceHolder<?>) super.getOperator(); + } + + @Override + public String getName() { + return "Solution Set"; + } + + @Override + public void computeUnclosedBranchStack() { + if (this.openBranches != null) { + return; + } + + DagConnection solutionSetInput = this.iterationNode.getFirstIncomingConnection(); + OptimizerNode solutionSetSource = solutionSetInput.getSource(); + + addClosedBranches(solutionSetSource.closedBranchingNodes); + List<UnclosedBranchDescriptor> fromInput = solutionSetSource.getBranchesForParent(solutionSetInput); + this.openBranches = (fromInput == null || fromInput.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : fromInput; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java new file mode 100644 index 0000000..83bc39a --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.dag; + +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +import java.util.Collections; +import java.util.List; + +/** + * The optimizer's internal representation of a <i>SortPartition</i> operator node. + */ +public class SortPartitionNode extends SingleInputNode { + + private final List<OperatorDescriptorSingle> possibleProperties; + + public SortPartitionNode(SortPartitionOperatorBase<?> operator) { + super(operator); + + OperatorDescriptorSingle descr = new SortPartitionDescriptor(operator.getPartitionOrdering()); + this.possibleProperties = Collections.singletonList(descr); + } + + @Override + public SortPartitionOperatorBase<?> getOperator() { + return (SortPartitionOperatorBase<?>) super.getOperator(); + } + + @Override + public String getName() { + return "Sort-Partition"; + } + + @Override + protected List<OperatorDescriptorSingle> getPossibleProperties() { + return this.possibleProperties; + } + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + // sorting does not change the number of records + this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); + this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize(); + } + + @Override + public SemanticProperties getSemanticProperties() { + return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); + } + + // -------------------------------------------------------------------------------------------- + + public static class SortPartitionDescriptor extends OperatorDescriptorSingle { + + private Ordering partitionOrder; + + public SortPartitionDescriptor(Ordering partitionOrder) { + this.partitionOrder = partitionOrder; + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.UNARY_NO_OP; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + return new SingleInputPlanNode(node, "Sort-Partition", in, DriverStrategy.UNARY_NO_OP); + } + + @Override + protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { + // sort partition does not require any global property + return Collections.singletonList(new RequestedGlobalProperties()); + } + + @Override + protected List<RequestedLocalProperties> createPossibleLocalProperties() { + // set partition order as required local property + RequestedLocalProperties rlp = new RequestedLocalProperties(); + rlp.setOrdering(this.partitionOrder); + + return Collections.singletonList(rlp); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + // sort partition is a no-operation operation, such that all global properties are preserved. + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + // sort partition is a no-operation operation, such that all global properties are preserved. + return lProps; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TempMode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TempMode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TempMode.java new file mode 100644 index 0000000..0d1dfc9 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TempMode.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +/** + * Enumeration to indicate the mode of temporarily materializing the data that flows across a connection. + * Introducing such an artificial dam is sometimes necessary to avoid that a certain data flows deadlock + * themselves, or as a cache to replay an intermediate result. + */ +public enum TempMode { + + NONE(false, false), + PIPELINE_BREAKER(false, true), + CACHED(true, false), + CACHING_PIPELINE_BREAKER(true, true); + + // -------------------------------------------------------------------------------------------- + + private final boolean cached; + + private final boolean breaksPipeline; + + + private TempMode(boolean cached, boolean breaksPipeline) { + this.cached = cached; + this.breaksPipeline = breaksPipeline; + } + + public boolean isCached() { + return cached; + } + + public boolean breaksPipeline() { + return breaksPipeline; + } + + public TempMode makePipelineBreaker() { + if (this == NONE) { + return PIPELINE_BREAKER; + } else if (this == CACHED) { + return CACHING_PIPELINE_BREAKER; + } else { + return this; + } + } + + public TempMode makeCached() { + if (this == NONE) { + return CACHED; + } else if (this == PIPELINE_BREAKER) { + return CACHING_PIPELINE_BREAKER; + } else { + return this; + } + } + + + public TempMode makeNonCached() { + if (this == CACHED) { + return NONE; + } else if (this == CACHING_PIPELINE_BREAKER) { + return PIPELINE_BREAKER; + } else { + return this; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java new file mode 100644 index 0000000..39da165 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java @@ -0,0 +1,747 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.dag; + +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.operators.DualInputOperator; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.CostEstimator; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.InterestingProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.operators.OperatorDescriptorDual; +import org.apache.flink.optimizer.operators.OperatorDescriptorDual.GlobalPropertiesPair; +import org.apache.flink.optimizer.operators.OperatorDescriptorDual.LocalPropertiesPair; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.NamedChannel; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.DataExchangeMode; +import org.apache.flink.runtime.operators.DamBehavior; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.util.Visitor; + +import com.google.common.collect.Sets; + +/** + * A node in the optimizer plan that represents a PACT with a two different inputs, such as MATCH or CROSS. + * The two inputs are not substitutable in their sides. + */ +public abstract class TwoInputNode extends OptimizerNode { + + protected final FieldList keys1; // The set of key fields for the first input + + protected final FieldList keys2; // The set of key fields for the second input + + protected DagConnection input1; // The first input edge + + protected DagConnection input2; // The second input edge + + private List<OperatorDescriptorDual> cachedDescriptors; + + // -------------------------------------------------------------------------------------------- + + /** + * Creates a new node with a single input for the optimizer plan. + * + * @param pactContract + * The PACT that the node represents. + */ + public TwoInputNode(DualInputOperator<?, ?, ?, ?> pactContract) { + super(pactContract); + + int[] k1 = pactContract.getKeyColumns(0); + int[] k2 = pactContract.getKeyColumns(1); + + this.keys1 = k1 == null || k1.length == 0 ? null : new FieldList(k1); + this.keys2 = k2 == null || k2.length == 0 ? null : new FieldList(k2); + + if (this.keys1 != null) { + if (this.keys2 != null) { + if (this.keys1.size() != this.keys2.size()) { + throw new CompilerException("Unequal number of key fields on the two inputs."); + } + } else { + throw new CompilerException("Keys are set on first input, but not on second."); + } + } else if (this.keys2 != null) { + throw new CompilerException("Keys are set on second input, but not on first."); + } + } + + // ------------------------------------------------------------------------ + + @Override + public DualInputOperator<?, ?, ?, ?> getOperator() { + return (DualInputOperator<?, ?, ?, ?>) super.getOperator(); + } + + /** + * Gets the <tt>PactConnection</tt> through which this node receives its <i>first</i> input. + * + * @return The first input connection. + */ + public DagConnection getFirstIncomingConnection() { + return this.input1; + } + + /** + * Gets the <tt>PactConnection</tt> through which this node receives its <i>second</i> input. + * + * @return The second input connection. + */ + public DagConnection getSecondIncomingConnection() { + return this.input2; + } + + public OptimizerNode getFirstPredecessorNode() { + if(this.input1 != null) { + return this.input1.getSource(); + } else { + return null; + } + } + + public OptimizerNode getSecondPredecessorNode() { + if(this.input2 != null) { + return this.input2.getSource(); + } else { + return null; + } + } + + @Override + public List<DagConnection> getIncomingConnections() { + ArrayList<DagConnection> inputs = new ArrayList<DagConnection>(2); + inputs.add(input1); + inputs.add(input2); + return inputs; + } + + + @Override + public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExecutionMode) { + // see if there is a hint that dictates which shipping strategy to use for BOTH inputs + final Configuration conf = getOperator().getParameters(); + ShipStrategyType preSet1 = null; + ShipStrategyType preSet2 = null; + + String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null); + if (shipStrategy != null) { + if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) { + preSet1 = preSet2 = ShipStrategyType.FORWARD; + } else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) { + preSet1 = preSet2 = ShipStrategyType.BROADCAST; + } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) { + preSet1 = preSet2 = ShipStrategyType.PARTITION_HASH; + } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) { + preSet1 = preSet2 = ShipStrategyType.PARTITION_RANGE; + } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) { + preSet1 = preSet2 = ShipStrategyType.PARTITION_RANDOM; + } else { + throw new CompilerException("Unknown hint for shipping strategy: " + shipStrategy); + } + } + + // see if there is a hint that dictates which shipping strategy to use for the FIRST input + shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, null); + if (shipStrategy != null) { + if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) { + preSet1 = ShipStrategyType.FORWARD; + } else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) { + preSet1 = ShipStrategyType.BROADCAST; + } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) { + preSet1 = ShipStrategyType.PARTITION_HASH; + } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) { + preSet1 = ShipStrategyType.PARTITION_RANGE; + } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) { + preSet1 = ShipStrategyType.PARTITION_RANDOM; + } else { + throw new CompilerException("Unknown hint for shipping strategy of input one: " + shipStrategy); + } + } + + // see if there is a hint that dictates which shipping strategy to use for the SECOND input + shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, null); + if (shipStrategy != null) { + if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) { + preSet2 = ShipStrategyType.FORWARD; + } else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) { + preSet2 = ShipStrategyType.BROADCAST; + } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) { + preSet2 = ShipStrategyType.PARTITION_HASH; + } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) { + preSet2 = ShipStrategyType.PARTITION_RANGE; + } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) { + preSet2 = ShipStrategyType.PARTITION_RANDOM; + } else { + throw new CompilerException("Unknown hint for shipping strategy of input two: " + shipStrategy); + } + } + + // get the predecessors + DualInputOperator<?, ?, ?, ?> contr = getOperator(); + + Operator<?> leftPred = contr.getFirstInput(); + Operator<?> rightPred = contr.getSecondInput(); + + OptimizerNode pred1; + DagConnection conn1; + if (leftPred == null) { + throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input set for first input."); + } else { + pred1 = contractToNode.get(leftPred); + conn1 = new DagConnection(pred1, this, defaultExecutionMode); + if (preSet1 != null) { + conn1.setShipStrategy(preSet1); + } + } + + // create the connection and add it + this.input1 = conn1; + pred1.addOutgoingConnection(conn1); + + OptimizerNode pred2; + DagConnection conn2; + if (rightPred == null) { + throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input set for second input."); + } else { + pred2 = contractToNode.get(rightPred); + conn2 = new DagConnection(pred2, this, defaultExecutionMode); + if (preSet2 != null) { + conn2.setShipStrategy(preSet2); + } + } + + // create the connection and add it + this.input2 = conn2; + pred2.addOutgoingConnection(conn2); + } + + protected abstract List<OperatorDescriptorDual> getPossibleProperties(); + + private List<OperatorDescriptorDual> getProperties() { + if (this.cachedDescriptors == null) { + this.cachedDescriptors = getPossibleProperties(); + } + return this.cachedDescriptors; + } + + @Override + public void computeInterestingPropertiesForInputs(CostEstimator estimator) { + // get what we inherit and what is preserved by our user code + final InterestingProperties props1 = getInterestingProperties().filterByCodeAnnotations(this, 0); + final InterestingProperties props2 = getInterestingProperties().filterByCodeAnnotations(this, 1); + + // add all properties relevant to this node + for (OperatorDescriptorDual dpd : getProperties()) { + for (GlobalPropertiesPair gp : dpd.getPossibleGlobalProperties()) { + // input 1 + props1.addGlobalProperties(gp.getProperties1()); + + // input 2 + props2.addGlobalProperties(gp.getProperties2()); + } + for (LocalPropertiesPair lp : dpd.getPossibleLocalProperties()) { + // input 1 + props1.addLocalProperties(lp.getProperties1()); + + // input 2 + props2.addLocalProperties(lp.getProperties2()); + } + } + this.input1.setInterestingProperties(props1); + this.input2.setInterestingProperties(props2); + + for (DagConnection conn : getBroadcastConnections()) { + conn.setInterestingProperties(new InterestingProperties()); + } + } + + @Override + public List<PlanNode> getAlternativePlans(CostEstimator estimator) { + // check if we have a cached version + if (this.cachedPlans != null) { + return this.cachedPlans; + } + + boolean childrenSkippedDueToReplicatedInput = false; + + // step down to all producer nodes and calculate alternative plans + final List<? extends PlanNode> subPlans1 = getFirstPredecessorNode().getAlternativePlans(estimator); + final List<? extends PlanNode> subPlans2 = getSecondPredecessorNode().getAlternativePlans(estimator); + + // calculate alternative sub-plans for predecessor + final Set<RequestedGlobalProperties> intGlobal1 = this.input1.getInterestingProperties().getGlobalProperties(); + final Set<RequestedGlobalProperties> intGlobal2 = this.input2.getInterestingProperties().getGlobalProperties(); + + // calculate alternative sub-plans for broadcast inputs + final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>(); + List<DagConnection> broadcastConnections = getBroadcastConnections(); + List<String> broadcastConnectionNames = getBroadcastConnectionNames(); + + for (int i = 0; i < broadcastConnections.size(); i++ ) { + DagConnection broadcastConnection = broadcastConnections.get(i); + String broadcastConnectionName = broadcastConnectionNames.get(i); + List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator); + + // wrap the plan candidates in named channels + HashSet<NamedChannel> broadcastChannels = new HashSet<NamedChannel>(broadcastPlanCandidates.size()); + for (PlanNode plan: broadcastPlanCandidates) { + final NamedChannel c = new NamedChannel(broadcastConnectionName, plan); + DataExchangeMode exMode = DataExchangeMode.select(broadcastConnection.getDataExchangeMode(), + ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline()); + c.setShipStrategy(ShipStrategyType.BROADCAST, exMode); + broadcastChannels.add(c); + } + broadcastPlanChannels.add(broadcastChannels); + } + + final GlobalPropertiesPair[] allGlobalPairs; + final LocalPropertiesPair[] allLocalPairs; + { + Set<GlobalPropertiesPair> pairsGlob = new HashSet<GlobalPropertiesPair>(); + Set<LocalPropertiesPair> pairsLoc = new HashSet<LocalPropertiesPair>(); + for (OperatorDescriptorDual ods : getProperties()) { + pairsGlob.addAll(ods.getPossibleGlobalProperties()); + pairsLoc.addAll(ods.getPossibleLocalProperties()); + } + allGlobalPairs = pairsGlob.toArray(new GlobalPropertiesPair[pairsGlob.size()]); + allLocalPairs = pairsLoc.toArray(new LocalPropertiesPair[pairsLoc.size()]); + } + + final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>(); + + final ExecutionMode input1Mode = this.input1.getDataExchangeMode(); + final ExecutionMode input2Mode = this.input2.getDataExchangeMode(); + + final int dop = getParallelism(); + final int inDop1 = getFirstPredecessorNode().getParallelism(); + final int inDop2 = getSecondPredecessorNode().getParallelism(); + + final boolean dopChange1 = dop != inDop1; + final boolean dopChange2 = dop != inDop2; + + final boolean input1breaksPipeline = this.input1.isBreakingPipeline(); + final boolean input2breaksPipeline = this.input2.isBreakingPipeline(); + + // enumerate all pairwise combination of the children's plans together with + // all possible operator strategy combination + + // create all candidates + for (PlanNode child1 : subPlans1) { + + if (child1.getGlobalProperties().isFullyReplicated()) { + // fully replicated input is always locally forwarded if DOP is not changed + if (dopChange1) { + // can not continue with this child + childrenSkippedDueToReplicatedInput = true; + continue; + } else { + this.input1.setShipStrategy(ShipStrategyType.FORWARD); + } + } + + for (PlanNode child2 : subPlans2) { + + if (child2.getGlobalProperties().isFullyReplicated()) { + // fully replicated input is always locally forwarded if DOP is not changed + if (dopChange2) { + // can not continue with this child + childrenSkippedDueToReplicatedInput = true; + continue; + } else { + this.input2.setShipStrategy(ShipStrategyType.FORWARD); + } + } + + // check that the children go together. that is the case if they build upon the same + // candidate at the joined branch plan. + if (!areBranchCompatible(child1, child2)) { + continue; + } + + for (RequestedGlobalProperties igps1: intGlobal1) { + // create a candidate channel for the first input. mark it cached, if the connection says so + final Channel c1 = new Channel(child1, this.input1.getMaterializationMode()); + if (this.input1.getShipStrategy() == null) { + // free to choose the ship strategy + igps1.parameterizeChannel(c1, dopChange1, input1Mode, input1breaksPipeline); + + // if the DOP changed, make sure that we cancel out properties, unless the + // ship strategy preserves/establishes them even under changing DOPs + if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) { + c1.getGlobalProperties().reset(); + } + } + else { + // ship strategy fixed by compiler hint + ShipStrategyType shipType = this.input1.getShipStrategy(); + DataExchangeMode exMode = DataExchangeMode.select(input1Mode, shipType, input1breaksPipeline); + if (this.keys1 != null) { + c1.setShipStrategy(shipType, this.keys1.toFieldList(), exMode); + } + else { + c1.setShipStrategy(shipType, exMode); + } + + if (dopChange1) { + c1.adjustGlobalPropertiesForFullParallelismChange(); + } + } + + for (RequestedGlobalProperties igps2: intGlobal2) { + // create a candidate channel for the first input. mark it cached, if the connection says so + final Channel c2 = new Channel(child2, this.input2.getMaterializationMode()); + if (this.input2.getShipStrategy() == null) { + // free to choose the ship strategy + igps2.parameterizeChannel(c2, dopChange2, input2Mode, input2breaksPipeline); + + // if the DOP changed, make sure that we cancel out properties, unless the + // ship strategy preserves/establishes them even under changing DOPs + if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) { + c2.getGlobalProperties().reset(); + } + } else { + // ship strategy fixed by compiler hint + ShipStrategyType shipType = this.input2.getShipStrategy(); + DataExchangeMode exMode = DataExchangeMode.select(input2Mode, shipType, input2breaksPipeline); + if (this.keys2 != null) { + c2.setShipStrategy(shipType, this.keys2.toFieldList(), exMode); + } else { + c2.setShipStrategy(shipType, exMode); + } + + if (dopChange2) { + c2.adjustGlobalPropertiesForFullParallelismChange(); + } + } + + /* ******************************************************************** + * NOTE: Depending on how we proceed with different partitioning, + * we might at some point need a compatibility check between + * the pairs of global properties. + * *******************************************************************/ + + outer: + for (GlobalPropertiesPair gpp : allGlobalPairs) { + if (gpp.getProperties1().isMetBy(c1.getGlobalProperties()) && + gpp.getProperties2().isMetBy(c2.getGlobalProperties()) ) + { + for (OperatorDescriptorDual desc : getProperties()) { + if (desc.areCompatible(gpp.getProperties1(), gpp.getProperties2(), + c1.getGlobalProperties(), c2.getGlobalProperties())) + { + Channel c1Clone = c1.clone(); + c1Clone.setRequiredGlobalProps(gpp.getProperties1()); + c2.setRequiredGlobalProps(gpp.getProperties2()); + + // we form a valid combination, so create the local candidates + // for this + addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2, + outputPlans, allLocalPairs, estimator); + break outer; + } + } + } + } + + // break the loop over input2's possible global properties, if the property + // is fixed via a hint. All the properties are overridden by the hint anyways, + // so we can stop after the first + if (this.input2.getShipStrategy() != null) { + break; + } + } + + // break the loop over input1's possible global properties, if the property + // is fixed via a hint. All the properties are overridden by the hint anyways, + // so we can stop after the first + if (this.input1.getShipStrategy() != null) { + break; + } + } + } + } + + if(outputPlans.isEmpty()) { + if(childrenSkippedDueToReplicatedInput) { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + + ". Most likely reason: Invalid use of replicated input."); + } else { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + + ". Most likely reason: Too restrictive plan hints."); + } + } + + // cost and prune the plans + for (PlanNode node : outputPlans) { + estimator.costOperator(node); + } + prunePlanAlternatives(outputPlans); + outputPlans.trimToSize(); + + this.cachedPlans = outputPlans; + return outputPlans; + } + + protected void addLocalCandidates(Channel template1, Channel template2, List<Set<? extends NamedChannel>> broadcastPlanChannels, + RequestedGlobalProperties rgps1, RequestedGlobalProperties rgps2, + List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations, CostEstimator estimator) + { + for (RequestedLocalProperties ilp1 : this.input1.getInterestingProperties().getLocalProperties()) { + final Channel in1 = template1.clone(); + ilp1.parameterizeChannel(in1); + + for (RequestedLocalProperties ilp2 : this.input2.getInterestingProperties().getLocalProperties()) { + final Channel in2 = template2.clone(); + ilp2.parameterizeChannel(in2); + + for (OperatorDescriptorDual dps: getProperties()) { + for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) { + if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) && + lpp.getProperties2().isMetBy(in2.getLocalProperties()) ) + { + // valid combination + // for non trivial local properties, we need to check that they are co compatible + // (such as when some sort order is requested, that both are the same sort order + if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(), + in1.getLocalProperties(), in2.getLocalProperties())) + { + // copy, because setting required properties and instantiation may + // change the channels and should not affect prior candidates + Channel in1Copy = in1.clone(); + in1Copy.setRequiredLocalProps(lpp.getProperties1()); + + Channel in2Copy = in2.clone(); + in2Copy.setRequiredLocalProps(lpp.getProperties2()); + + // all right, co compatible + instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2); + break; + } + // else cannot use this pair, fall through the loop and try the next one + } + } + } + } + } + } + + protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel in2, + List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator, + RequestedGlobalProperties globPropsReq1, RequestedGlobalProperties globPropsReq2, + RequestedLocalProperties locPropsReq1, RequestedLocalProperties locPropsReq2) + { + final PlanNode inputSource1 = in1.getSource(); + final PlanNode inputSource2 = in2.getSource(); + + for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) { + + boolean validCombination = true; + + // check whether the broadcast inputs use the same plan candidate at the branching point + for (int i = 0; i < broadcastChannelsCombination.size(); i++) { + NamedChannel nc = broadcastChannelsCombination.get(i); + PlanNode bcSource = nc.getSource(); + + if (!(areBranchCompatible(bcSource, inputSource1) || areBranchCompatible(bcSource, inputSource2))) { + validCombination = false; + break; + } + + // check branch compatibility against all other broadcast variables + for (int k = 0; k < i; k++) { + PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource(); + + if (!areBranchCompatible(bcSource, otherBcSource)) { + validCombination = false; + break; + } + } + } + + if (!validCombination) { + continue; + } + + placePipelineBreakersIfNecessary(operator.getStrategy(), in1, in2); + + DualInputPlanNode node = operator.instantiate(in1, in2, this); + node.setBroadcastInputs(broadcastChannelsCombination); + + SemanticProperties props = this.getSemanticProperties(); + GlobalProperties gp1 = in1.getGlobalProperties().clone().filterBySemanticProperties(props, 0); + GlobalProperties gp2 = in2.getGlobalProperties().clone().filterBySemanticProperties(props, 1); + GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2); + + LocalProperties lp1 = in1.getLocalProperties().clone().filterBySemanticProperties(props, 0); + LocalProperties lp2 = in2.getLocalProperties().clone().filterBySemanticProperties(props, 1); + LocalProperties locals = operator.computeLocalProperties(lp1, lp2); + + node.initProperties(combined, locals); + node.updatePropertiesWithUniqueSets(getUniqueFields()); + target.add(node); + } + } + + protected void placePipelineBreakersIfNecessary(DriverStrategy strategy, Channel in1, Channel in2) { + // before we instantiate, check for deadlocks by tracing back to the open branches and checking + // whether either no input, or all of them have a dam + if (this.hereJoinedBranches != null && this.hereJoinedBranches.size() > 0) { + boolean someDamOnLeftPaths = false; + boolean damOnAllLeftPaths = true; + boolean someDamOnRightPaths = false; + boolean damOnAllRightPaths = true; + + if (strategy.firstDam() == DamBehavior.FULL_DAM || in1.getLocalStrategy().dams() || in1.getTempMode().breaksPipeline()) { + someDamOnLeftPaths = true; + } else { + for (OptimizerNode brancher : this.hereJoinedBranches) { + PlanNode candAtBrancher = in1.getSource().getCandidateAtBranchPoint(brancher); + + // not all candidates are found, because this list includes joined branched from both regular inputs and broadcast vars + if (candAtBrancher == null) { + continue; + } + + SourceAndDamReport res = in1.getSource().hasDamOnPathDownTo(candAtBrancher); + if (res == NOT_FOUND) { + throw new CompilerException("Bug: Tracing dams for deadlock detection is broken."); + } else if (res == FOUND_SOURCE) { + damOnAllLeftPaths = false; + } else if (res == FOUND_SOURCE_AND_DAM) { + someDamOnLeftPaths = true; + } else { + throw new CompilerException(); + } + } + } + + if (strategy.secondDam() == DamBehavior.FULL_DAM || in2.getLocalStrategy().dams() || in2.getTempMode().breaksPipeline()) { + someDamOnRightPaths = true; + } else { + for (OptimizerNode brancher : this.hereJoinedBranches) { + PlanNode candAtBrancher = in2.getSource().getCandidateAtBranchPoint(brancher); + + // not all candidates are found, because this list includes joined branched from both regular inputs and broadcast vars + if (candAtBrancher == null) { + continue; + } + + SourceAndDamReport res = in2.getSource().hasDamOnPathDownTo(candAtBrancher); + if (res == NOT_FOUND) { + throw new CompilerException("Bug: Tracing dams for deadlock detection is broken."); + } else if (res == FOUND_SOURCE) { + damOnAllRightPaths = false; + } else if (res == FOUND_SOURCE_AND_DAM) { + someDamOnRightPaths = true; + } else { + throw new CompilerException(); + } + } + } + + // okay combinations are both all dam or both no dam + if ( (damOnAllLeftPaths & damOnAllRightPaths) | (!someDamOnLeftPaths & !someDamOnRightPaths) ) { + // good, either both materialize already on the way, or both fully pipeline + } else { + if (someDamOnLeftPaths & !damOnAllRightPaths) { + // right needs a pipeline breaker + in2.setTempMode(in2.getTempMode().makePipelineBreaker()); + } + + if (someDamOnRightPaths & !damOnAllLeftPaths) { + // right needs a pipeline breaker + in1.setTempMode(in1.getTempMode().makePipelineBreaker()); + } + } + } + } + + @Override + public void computeUnclosedBranchStack() { + if (this.openBranches != null) { + return; + } + + // handle the data flow branching for the regular inputs + addClosedBranches(getFirstPredecessorNode().closedBranchingNodes); + addClosedBranches(getSecondPredecessorNode().closedBranchingNodes); + + List<UnclosedBranchDescriptor> result1 = getFirstPredecessorNode().getBranchesForParent(getFirstIncomingConnection()); + List<UnclosedBranchDescriptor> result2 = getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection()); + + ArrayList<UnclosedBranchDescriptor> inputsMerged = new ArrayList<UnclosedBranchDescriptor>(); + mergeLists(result1, result2, inputsMerged, true); + + // handle the data flow branching for the broadcast inputs + List<UnclosedBranchDescriptor> result = computeUnclosedBranchStackForBroadcastInputs(inputsMerged); + + this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result; + } + + @Override + public SemanticProperties getSemanticProperties() { + return getOperator().getSemanticProperties(); + } + + // -------------------------------------------------------------------------------------------- + // Miscellaneous + // -------------------------------------------------------------------------------------------- + + @Override + public void accept(Visitor<OptimizerNode> visitor) { + if (visitor.preVisit(this)) { + if (this.input1 == null || this.input2 == null) { + throw new CompilerException(); + } + + getFirstPredecessorNode().accept(visitor); + getSecondPredecessorNode().accept(visitor); + + for (DagConnection connection : getBroadcastConnections()) { + connection.getSource().accept(visitor); + } + + visitor.postVisit(this); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java new file mode 100644 index 0000000..45ecdac --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; + + +public class UnaryOperatorNode extends SingleInputNode { + + private final List<OperatorDescriptorSingle> operator; + + private final String name; + + + + public UnaryOperatorNode(String name, FieldSet keys, OperatorDescriptorSingle ... operators) { + this(name, keys, Arrays.asList(operators)); + } + + public UnaryOperatorNode(String name, FieldSet keys, List<OperatorDescriptorSingle> operators) { + super(keys); + + this.operator = operators; + this.name = name; + } + + @Override + protected List<OperatorDescriptorSingle> getPossibleProperties() { + return this.operator; + } + + @Override + public String getName() { + return this.name; + } + + @Override + public SemanticProperties getSemanticProperties() { + return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); + } + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + // we have no estimates by default + } +}