http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java deleted file mode 100644 index 0d1dfc9..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.dag; - -/** - * Enumeration to indicate the mode of temporarily materializing the data that flows across a connection. - * Introducing such an artificial dam is sometimes necessary to avoid that a certain data flows deadlock - * themselves, or as a cache to replay an intermediate result. - */ -public enum TempMode { - - NONE(false, false), - PIPELINE_BREAKER(false, true), - CACHED(true, false), - CACHING_PIPELINE_BREAKER(true, true); - - // -------------------------------------------------------------------------------------------- - - private final boolean cached; - - private final boolean breaksPipeline; - - - private TempMode(boolean cached, boolean breaksPipeline) { - this.cached = cached; - this.breaksPipeline = breaksPipeline; - } - - public boolean isCached() { - return cached; - } - - public boolean breaksPipeline() { - return breaksPipeline; - } - - public TempMode makePipelineBreaker() { - if (this == NONE) { - return PIPELINE_BREAKER; - } else if (this == CACHED) { - return CACHING_PIPELINE_BREAKER; - } else { - return this; - } - } - - public TempMode makeCached() { - if (this == NONE) { - return CACHED; - } else if (this == PIPELINE_BREAKER) { - return CACHING_PIPELINE_BREAKER; - } else { - return this; - } - } - - - public TempMode makeNonCached() { - if (this == CACHED) { - return NONE; - } else if (this == CACHING_PIPELINE_BREAKER) { - return PIPELINE_BREAKER; - } else { - return this; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java deleted file mode 100644 index 39da165..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java +++ /dev/null @@ -1,747 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.dag; - -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.flink.api.common.ExecutionMode; -import org.apache.flink.api.common.operators.DualInputOperator; -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.common.operators.SemanticProperties; -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.Optimizer; -import org.apache.flink.optimizer.costs.CostEstimator; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.InterestingProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.operators.OperatorDescriptorDual; -import org.apache.flink.optimizer.operators.OperatorDescriptorDual.GlobalPropertiesPair; -import org.apache.flink.optimizer.operators.OperatorDescriptorDual.LocalPropertiesPair; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.NamedChannel; -import org.apache.flink.optimizer.plan.PlanNode; -import org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.io.network.DataExchangeMode; -import org.apache.flink.runtime.operators.DamBehavior; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.util.Visitor; - -import com.google.common.collect.Sets; - -/** - * A node in the optimizer plan that represents a PACT with a two different inputs, such as MATCH or CROSS. - * The two inputs are not substitutable in their sides. - */ -public abstract class TwoInputNode extends OptimizerNode { - - protected final FieldList keys1; // The set of key fields for the first input - - protected final FieldList keys2; // The set of key fields for the second input - - protected DagConnection input1; // The first input edge - - protected DagConnection input2; // The second input edge - - private List<OperatorDescriptorDual> cachedDescriptors; - - // -------------------------------------------------------------------------------------------- - - /** - * Creates a new node with a single input for the optimizer plan. - * - * @param pactContract - * The PACT that the node represents. - */ - public TwoInputNode(DualInputOperator<?, ?, ?, ?> pactContract) { - super(pactContract); - - int[] k1 = pactContract.getKeyColumns(0); - int[] k2 = pactContract.getKeyColumns(1); - - this.keys1 = k1 == null || k1.length == 0 ? null : new FieldList(k1); - this.keys2 = k2 == null || k2.length == 0 ? null : new FieldList(k2); - - if (this.keys1 != null) { - if (this.keys2 != null) { - if (this.keys1.size() != this.keys2.size()) { - throw new CompilerException("Unequal number of key fields on the two inputs."); - } - } else { - throw new CompilerException("Keys are set on first input, but not on second."); - } - } else if (this.keys2 != null) { - throw new CompilerException("Keys are set on second input, but not on first."); - } - } - - // ------------------------------------------------------------------------ - - @Override - public DualInputOperator<?, ?, ?, ?> getOperator() { - return (DualInputOperator<?, ?, ?, ?>) super.getOperator(); - } - - /** - * Gets the <tt>PactConnection</tt> through which this node receives its <i>first</i> input. - * - * @return The first input connection. - */ - public DagConnection getFirstIncomingConnection() { - return this.input1; - } - - /** - * Gets the <tt>PactConnection</tt> through which this node receives its <i>second</i> input. - * - * @return The second input connection. - */ - public DagConnection getSecondIncomingConnection() { - return this.input2; - } - - public OptimizerNode getFirstPredecessorNode() { - if(this.input1 != null) { - return this.input1.getSource(); - } else { - return null; - } - } - - public OptimizerNode getSecondPredecessorNode() { - if(this.input2 != null) { - return this.input2.getSource(); - } else { - return null; - } - } - - @Override - public List<DagConnection> getIncomingConnections() { - ArrayList<DagConnection> inputs = new ArrayList<DagConnection>(2); - inputs.add(input1); - inputs.add(input2); - return inputs; - } - - - @Override - public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExecutionMode) { - // see if there is a hint that dictates which shipping strategy to use for BOTH inputs - final Configuration conf = getOperator().getParameters(); - ShipStrategyType preSet1 = null; - ShipStrategyType preSet2 = null; - - String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null); - if (shipStrategy != null) { - if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) { - preSet1 = preSet2 = ShipStrategyType.FORWARD; - } else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) { - preSet1 = preSet2 = ShipStrategyType.BROADCAST; - } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) { - preSet1 = preSet2 = ShipStrategyType.PARTITION_HASH; - } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) { - preSet1 = preSet2 = ShipStrategyType.PARTITION_RANGE; - } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) { - preSet1 = preSet2 = ShipStrategyType.PARTITION_RANDOM; - } else { - throw new CompilerException("Unknown hint for shipping strategy: " + shipStrategy); - } - } - - // see if there is a hint that dictates which shipping strategy to use for the FIRST input - shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, null); - if (shipStrategy != null) { - if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) { - preSet1 = ShipStrategyType.FORWARD; - } else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) { - preSet1 = ShipStrategyType.BROADCAST; - } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) { - preSet1 = ShipStrategyType.PARTITION_HASH; - } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) { - preSet1 = ShipStrategyType.PARTITION_RANGE; - } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) { - preSet1 = ShipStrategyType.PARTITION_RANDOM; - } else { - throw new CompilerException("Unknown hint for shipping strategy of input one: " + shipStrategy); - } - } - - // see if there is a hint that dictates which shipping strategy to use for the SECOND input - shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, null); - if (shipStrategy != null) { - if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) { - preSet2 = ShipStrategyType.FORWARD; - } else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) { - preSet2 = ShipStrategyType.BROADCAST; - } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) { - preSet2 = ShipStrategyType.PARTITION_HASH; - } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) { - preSet2 = ShipStrategyType.PARTITION_RANGE; - } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) { - preSet2 = ShipStrategyType.PARTITION_RANDOM; - } else { - throw new CompilerException("Unknown hint for shipping strategy of input two: " + shipStrategy); - } - } - - // get the predecessors - DualInputOperator<?, ?, ?, ?> contr = getOperator(); - - Operator<?> leftPred = contr.getFirstInput(); - Operator<?> rightPred = contr.getSecondInput(); - - OptimizerNode pred1; - DagConnection conn1; - if (leftPred == null) { - throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input set for first input."); - } else { - pred1 = contractToNode.get(leftPred); - conn1 = new DagConnection(pred1, this, defaultExecutionMode); - if (preSet1 != null) { - conn1.setShipStrategy(preSet1); - } - } - - // create the connection and add it - this.input1 = conn1; - pred1.addOutgoingConnection(conn1); - - OptimizerNode pred2; - DagConnection conn2; - if (rightPred == null) { - throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input set for second input."); - } else { - pred2 = contractToNode.get(rightPred); - conn2 = new DagConnection(pred2, this, defaultExecutionMode); - if (preSet2 != null) { - conn2.setShipStrategy(preSet2); - } - } - - // create the connection and add it - this.input2 = conn2; - pred2.addOutgoingConnection(conn2); - } - - protected abstract List<OperatorDescriptorDual> getPossibleProperties(); - - private List<OperatorDescriptorDual> getProperties() { - if (this.cachedDescriptors == null) { - this.cachedDescriptors = getPossibleProperties(); - } - return this.cachedDescriptors; - } - - @Override - public void computeInterestingPropertiesForInputs(CostEstimator estimator) { - // get what we inherit and what is preserved by our user code - final InterestingProperties props1 = getInterestingProperties().filterByCodeAnnotations(this, 0); - final InterestingProperties props2 = getInterestingProperties().filterByCodeAnnotations(this, 1); - - // add all properties relevant to this node - for (OperatorDescriptorDual dpd : getProperties()) { - for (GlobalPropertiesPair gp : dpd.getPossibleGlobalProperties()) { - // input 1 - props1.addGlobalProperties(gp.getProperties1()); - - // input 2 - props2.addGlobalProperties(gp.getProperties2()); - } - for (LocalPropertiesPair lp : dpd.getPossibleLocalProperties()) { - // input 1 - props1.addLocalProperties(lp.getProperties1()); - - // input 2 - props2.addLocalProperties(lp.getProperties2()); - } - } - this.input1.setInterestingProperties(props1); - this.input2.setInterestingProperties(props2); - - for (DagConnection conn : getBroadcastConnections()) { - conn.setInterestingProperties(new InterestingProperties()); - } - } - - @Override - public List<PlanNode> getAlternativePlans(CostEstimator estimator) { - // check if we have a cached version - if (this.cachedPlans != null) { - return this.cachedPlans; - } - - boolean childrenSkippedDueToReplicatedInput = false; - - // step down to all producer nodes and calculate alternative plans - final List<? extends PlanNode> subPlans1 = getFirstPredecessorNode().getAlternativePlans(estimator); - final List<? extends PlanNode> subPlans2 = getSecondPredecessorNode().getAlternativePlans(estimator); - - // calculate alternative sub-plans for predecessor - final Set<RequestedGlobalProperties> intGlobal1 = this.input1.getInterestingProperties().getGlobalProperties(); - final Set<RequestedGlobalProperties> intGlobal2 = this.input2.getInterestingProperties().getGlobalProperties(); - - // calculate alternative sub-plans for broadcast inputs - final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>(); - List<DagConnection> broadcastConnections = getBroadcastConnections(); - List<String> broadcastConnectionNames = getBroadcastConnectionNames(); - - for (int i = 0; i < broadcastConnections.size(); i++ ) { - DagConnection broadcastConnection = broadcastConnections.get(i); - String broadcastConnectionName = broadcastConnectionNames.get(i); - List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator); - - // wrap the plan candidates in named channels - HashSet<NamedChannel> broadcastChannels = new HashSet<NamedChannel>(broadcastPlanCandidates.size()); - for (PlanNode plan: broadcastPlanCandidates) { - final NamedChannel c = new NamedChannel(broadcastConnectionName, plan); - DataExchangeMode exMode = DataExchangeMode.select(broadcastConnection.getDataExchangeMode(), - ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline()); - c.setShipStrategy(ShipStrategyType.BROADCAST, exMode); - broadcastChannels.add(c); - } - broadcastPlanChannels.add(broadcastChannels); - } - - final GlobalPropertiesPair[] allGlobalPairs; - final LocalPropertiesPair[] allLocalPairs; - { - Set<GlobalPropertiesPair> pairsGlob = new HashSet<GlobalPropertiesPair>(); - Set<LocalPropertiesPair> pairsLoc = new HashSet<LocalPropertiesPair>(); - for (OperatorDescriptorDual ods : getProperties()) { - pairsGlob.addAll(ods.getPossibleGlobalProperties()); - pairsLoc.addAll(ods.getPossibleLocalProperties()); - } - allGlobalPairs = pairsGlob.toArray(new GlobalPropertiesPair[pairsGlob.size()]); - allLocalPairs = pairsLoc.toArray(new LocalPropertiesPair[pairsLoc.size()]); - } - - final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>(); - - final ExecutionMode input1Mode = this.input1.getDataExchangeMode(); - final ExecutionMode input2Mode = this.input2.getDataExchangeMode(); - - final int dop = getParallelism(); - final int inDop1 = getFirstPredecessorNode().getParallelism(); - final int inDop2 = getSecondPredecessorNode().getParallelism(); - - final boolean dopChange1 = dop != inDop1; - final boolean dopChange2 = dop != inDop2; - - final boolean input1breaksPipeline = this.input1.isBreakingPipeline(); - final boolean input2breaksPipeline = this.input2.isBreakingPipeline(); - - // enumerate all pairwise combination of the children's plans together with - // all possible operator strategy combination - - // create all candidates - for (PlanNode child1 : subPlans1) { - - if (child1.getGlobalProperties().isFullyReplicated()) { - // fully replicated input is always locally forwarded if DOP is not changed - if (dopChange1) { - // can not continue with this child - childrenSkippedDueToReplicatedInput = true; - continue; - } else { - this.input1.setShipStrategy(ShipStrategyType.FORWARD); - } - } - - for (PlanNode child2 : subPlans2) { - - if (child2.getGlobalProperties().isFullyReplicated()) { - // fully replicated input is always locally forwarded if DOP is not changed - if (dopChange2) { - // can not continue with this child - childrenSkippedDueToReplicatedInput = true; - continue; - } else { - this.input2.setShipStrategy(ShipStrategyType.FORWARD); - } - } - - // check that the children go together. that is the case if they build upon the same - // candidate at the joined branch plan. - if (!areBranchCompatible(child1, child2)) { - continue; - } - - for (RequestedGlobalProperties igps1: intGlobal1) { - // create a candidate channel for the first input. mark it cached, if the connection says so - final Channel c1 = new Channel(child1, this.input1.getMaterializationMode()); - if (this.input1.getShipStrategy() == null) { - // free to choose the ship strategy - igps1.parameterizeChannel(c1, dopChange1, input1Mode, input1breaksPipeline); - - // if the DOP changed, make sure that we cancel out properties, unless the - // ship strategy preserves/establishes them even under changing DOPs - if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) { - c1.getGlobalProperties().reset(); - } - } - else { - // ship strategy fixed by compiler hint - ShipStrategyType shipType = this.input1.getShipStrategy(); - DataExchangeMode exMode = DataExchangeMode.select(input1Mode, shipType, input1breaksPipeline); - if (this.keys1 != null) { - c1.setShipStrategy(shipType, this.keys1.toFieldList(), exMode); - } - else { - c1.setShipStrategy(shipType, exMode); - } - - if (dopChange1) { - c1.adjustGlobalPropertiesForFullParallelismChange(); - } - } - - for (RequestedGlobalProperties igps2: intGlobal2) { - // create a candidate channel for the first input. mark it cached, if the connection says so - final Channel c2 = new Channel(child2, this.input2.getMaterializationMode()); - if (this.input2.getShipStrategy() == null) { - // free to choose the ship strategy - igps2.parameterizeChannel(c2, dopChange2, input2Mode, input2breaksPipeline); - - // if the DOP changed, make sure that we cancel out properties, unless the - // ship strategy preserves/establishes them even under changing DOPs - if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) { - c2.getGlobalProperties().reset(); - } - } else { - // ship strategy fixed by compiler hint - ShipStrategyType shipType = this.input2.getShipStrategy(); - DataExchangeMode exMode = DataExchangeMode.select(input2Mode, shipType, input2breaksPipeline); - if (this.keys2 != null) { - c2.setShipStrategy(shipType, this.keys2.toFieldList(), exMode); - } else { - c2.setShipStrategy(shipType, exMode); - } - - if (dopChange2) { - c2.adjustGlobalPropertiesForFullParallelismChange(); - } - } - - /* ******************************************************************** - * NOTE: Depending on how we proceed with different partitioning, - * we might at some point need a compatibility check between - * the pairs of global properties. - * *******************************************************************/ - - outer: - for (GlobalPropertiesPair gpp : allGlobalPairs) { - if (gpp.getProperties1().isMetBy(c1.getGlobalProperties()) && - gpp.getProperties2().isMetBy(c2.getGlobalProperties()) ) - { - for (OperatorDescriptorDual desc : getProperties()) { - if (desc.areCompatible(gpp.getProperties1(), gpp.getProperties2(), - c1.getGlobalProperties(), c2.getGlobalProperties())) - { - Channel c1Clone = c1.clone(); - c1Clone.setRequiredGlobalProps(gpp.getProperties1()); - c2.setRequiredGlobalProps(gpp.getProperties2()); - - // we form a valid combination, so create the local candidates - // for this - addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2, - outputPlans, allLocalPairs, estimator); - break outer; - } - } - } - } - - // break the loop over input2's possible global properties, if the property - // is fixed via a hint. All the properties are overridden by the hint anyways, - // so we can stop after the first - if (this.input2.getShipStrategy() != null) { - break; - } - } - - // break the loop over input1's possible global properties, if the property - // is fixed via a hint. All the properties are overridden by the hint anyways, - // so we can stop after the first - if (this.input1.getShipStrategy() != null) { - break; - } - } - } - } - - if(outputPlans.isEmpty()) { - if(childrenSkippedDueToReplicatedInput) { - throw new CompilerException("No plan meeting the requirements could be created @ " + this - + ". Most likely reason: Invalid use of replicated input."); - } else { - throw new CompilerException("No plan meeting the requirements could be created @ " + this - + ". Most likely reason: Too restrictive plan hints."); - } - } - - // cost and prune the plans - for (PlanNode node : outputPlans) { - estimator.costOperator(node); - } - prunePlanAlternatives(outputPlans); - outputPlans.trimToSize(); - - this.cachedPlans = outputPlans; - return outputPlans; - } - - protected void addLocalCandidates(Channel template1, Channel template2, List<Set<? extends NamedChannel>> broadcastPlanChannels, - RequestedGlobalProperties rgps1, RequestedGlobalProperties rgps2, - List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations, CostEstimator estimator) - { - for (RequestedLocalProperties ilp1 : this.input1.getInterestingProperties().getLocalProperties()) { - final Channel in1 = template1.clone(); - ilp1.parameterizeChannel(in1); - - for (RequestedLocalProperties ilp2 : this.input2.getInterestingProperties().getLocalProperties()) { - final Channel in2 = template2.clone(); - ilp2.parameterizeChannel(in2); - - for (OperatorDescriptorDual dps: getProperties()) { - for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) { - if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) && - lpp.getProperties2().isMetBy(in2.getLocalProperties()) ) - { - // valid combination - // for non trivial local properties, we need to check that they are co compatible - // (such as when some sort order is requested, that both are the same sort order - if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(), - in1.getLocalProperties(), in2.getLocalProperties())) - { - // copy, because setting required properties and instantiation may - // change the channels and should not affect prior candidates - Channel in1Copy = in1.clone(); - in1Copy.setRequiredLocalProps(lpp.getProperties1()); - - Channel in2Copy = in2.clone(); - in2Copy.setRequiredLocalProps(lpp.getProperties2()); - - // all right, co compatible - instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2); - break; - } - // else cannot use this pair, fall through the loop and try the next one - } - } - } - } - } - } - - protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel in2, - List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator, - RequestedGlobalProperties globPropsReq1, RequestedGlobalProperties globPropsReq2, - RequestedLocalProperties locPropsReq1, RequestedLocalProperties locPropsReq2) - { - final PlanNode inputSource1 = in1.getSource(); - final PlanNode inputSource2 = in2.getSource(); - - for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) { - - boolean validCombination = true; - - // check whether the broadcast inputs use the same plan candidate at the branching point - for (int i = 0; i < broadcastChannelsCombination.size(); i++) { - NamedChannel nc = broadcastChannelsCombination.get(i); - PlanNode bcSource = nc.getSource(); - - if (!(areBranchCompatible(bcSource, inputSource1) || areBranchCompatible(bcSource, inputSource2))) { - validCombination = false; - break; - } - - // check branch compatibility against all other broadcast variables - for (int k = 0; k < i; k++) { - PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource(); - - if (!areBranchCompatible(bcSource, otherBcSource)) { - validCombination = false; - break; - } - } - } - - if (!validCombination) { - continue; - } - - placePipelineBreakersIfNecessary(operator.getStrategy(), in1, in2); - - DualInputPlanNode node = operator.instantiate(in1, in2, this); - node.setBroadcastInputs(broadcastChannelsCombination); - - SemanticProperties props = this.getSemanticProperties(); - GlobalProperties gp1 = in1.getGlobalProperties().clone().filterBySemanticProperties(props, 0); - GlobalProperties gp2 = in2.getGlobalProperties().clone().filterBySemanticProperties(props, 1); - GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2); - - LocalProperties lp1 = in1.getLocalProperties().clone().filterBySemanticProperties(props, 0); - LocalProperties lp2 = in2.getLocalProperties().clone().filterBySemanticProperties(props, 1); - LocalProperties locals = operator.computeLocalProperties(lp1, lp2); - - node.initProperties(combined, locals); - node.updatePropertiesWithUniqueSets(getUniqueFields()); - target.add(node); - } - } - - protected void placePipelineBreakersIfNecessary(DriverStrategy strategy, Channel in1, Channel in2) { - // before we instantiate, check for deadlocks by tracing back to the open branches and checking - // whether either no input, or all of them have a dam - if (this.hereJoinedBranches != null && this.hereJoinedBranches.size() > 0) { - boolean someDamOnLeftPaths = false; - boolean damOnAllLeftPaths = true; - boolean someDamOnRightPaths = false; - boolean damOnAllRightPaths = true; - - if (strategy.firstDam() == DamBehavior.FULL_DAM || in1.getLocalStrategy().dams() || in1.getTempMode().breaksPipeline()) { - someDamOnLeftPaths = true; - } else { - for (OptimizerNode brancher : this.hereJoinedBranches) { - PlanNode candAtBrancher = in1.getSource().getCandidateAtBranchPoint(brancher); - - // not all candidates are found, because this list includes joined branched from both regular inputs and broadcast vars - if (candAtBrancher == null) { - continue; - } - - SourceAndDamReport res = in1.getSource().hasDamOnPathDownTo(candAtBrancher); - if (res == NOT_FOUND) { - throw new CompilerException("Bug: Tracing dams for deadlock detection is broken."); - } else if (res == FOUND_SOURCE) { - damOnAllLeftPaths = false; - } else if (res == FOUND_SOURCE_AND_DAM) { - someDamOnLeftPaths = true; - } else { - throw new CompilerException(); - } - } - } - - if (strategy.secondDam() == DamBehavior.FULL_DAM || in2.getLocalStrategy().dams() || in2.getTempMode().breaksPipeline()) { - someDamOnRightPaths = true; - } else { - for (OptimizerNode brancher : this.hereJoinedBranches) { - PlanNode candAtBrancher = in2.getSource().getCandidateAtBranchPoint(brancher); - - // not all candidates are found, because this list includes joined branched from both regular inputs and broadcast vars - if (candAtBrancher == null) { - continue; - } - - SourceAndDamReport res = in2.getSource().hasDamOnPathDownTo(candAtBrancher); - if (res == NOT_FOUND) { - throw new CompilerException("Bug: Tracing dams for deadlock detection is broken."); - } else if (res == FOUND_SOURCE) { - damOnAllRightPaths = false; - } else if (res == FOUND_SOURCE_AND_DAM) { - someDamOnRightPaths = true; - } else { - throw new CompilerException(); - } - } - } - - // okay combinations are both all dam or both no dam - if ( (damOnAllLeftPaths & damOnAllRightPaths) | (!someDamOnLeftPaths & !someDamOnRightPaths) ) { - // good, either both materialize already on the way, or both fully pipeline - } else { - if (someDamOnLeftPaths & !damOnAllRightPaths) { - // right needs a pipeline breaker - in2.setTempMode(in2.getTempMode().makePipelineBreaker()); - } - - if (someDamOnRightPaths & !damOnAllLeftPaths) { - // right needs a pipeline breaker - in1.setTempMode(in1.getTempMode().makePipelineBreaker()); - } - } - } - } - - @Override - public void computeUnclosedBranchStack() { - if (this.openBranches != null) { - return; - } - - // handle the data flow branching for the regular inputs - addClosedBranches(getFirstPredecessorNode().closedBranchingNodes); - addClosedBranches(getSecondPredecessorNode().closedBranchingNodes); - - List<UnclosedBranchDescriptor> result1 = getFirstPredecessorNode().getBranchesForParent(getFirstIncomingConnection()); - List<UnclosedBranchDescriptor> result2 = getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection()); - - ArrayList<UnclosedBranchDescriptor> inputsMerged = new ArrayList<UnclosedBranchDescriptor>(); - mergeLists(result1, result2, inputsMerged, true); - - // handle the data flow branching for the broadcast inputs - List<UnclosedBranchDescriptor> result = computeUnclosedBranchStackForBroadcastInputs(inputsMerged); - - this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result; - } - - @Override - public SemanticProperties getSemanticProperties() { - return getOperator().getSemanticProperties(); - } - - // -------------------------------------------------------------------------------------------- - // Miscellaneous - // -------------------------------------------------------------------------------------------- - - @Override - public void accept(Visitor<OptimizerNode> visitor) { - if (visitor.preVisit(this)) { - if (this.input1 == null || this.input2 == null) { - throw new CompilerException(); - } - - getFirstPredecessorNode().accept(visitor); - getSecondPredecessorNode().accept(visitor); - - for (DagConnection connection : getBroadcastConnections()) { - connection.getSource().accept(visitor); - } - - visitor.postVisit(this); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java deleted file mode 100644 index 45ecdac..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.dag; - -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.api.common.operators.SemanticProperties; -import org.apache.flink.api.common.operators.SingleInputSemanticProperties; -import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; - - -public class UnaryOperatorNode extends SingleInputNode { - - private final List<OperatorDescriptorSingle> operator; - - private final String name; - - - - public UnaryOperatorNode(String name, FieldSet keys, OperatorDescriptorSingle ... operators) { - this(name, keys, Arrays.asList(operators)); - } - - public UnaryOperatorNode(String name, FieldSet keys, List<OperatorDescriptorSingle> operators) { - super(keys); - - this.operator = operators; - this.name = name; - } - - @Override - protected List<OperatorDescriptorSingle> getPossibleProperties() { - return this.operator; - } - - @Override - public String getName() { - return this.name; - } - - @Override - public SemanticProperties getSemanticProperties() { - return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); - } - - @Override - protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { - // we have no estimates by default - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java deleted file mode 100644 index e85f289..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java +++ /dev/null @@ -1,589 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.dag; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import org.apache.flink.api.common.ExecutionMode; -import org.apache.flink.api.common.operators.SemanticProperties; -import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties; -import org.apache.flink.api.common.operators.base.DeltaIterationBase; -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.common.typeinfo.NothingTypeInfo; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor; -import org.apache.flink.optimizer.costs.CostEstimator; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.InterestingProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.PartitioningProperty; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.operators.OperatorDescriptorDual; -import org.apache.flink.optimizer.operators.SolutionSetDeltaOperator; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.NamedChannel; -import org.apache.flink.optimizer.plan.PlanNode; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SolutionSetPlanNode; -import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; -import org.apache.flink.optimizer.plan.WorksetPlanNode; -import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport; -import org.apache.flink.optimizer.util.NoOpBinaryUdfOp; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.util.LocalStrategy; -import org.apache.flink.types.Nothing; -import org.apache.flink.util.Visitor; - -/** - * A node in the optimizer's program representation for a workset iteration. - */ -public class WorksetIterationNode extends TwoInputNode implements IterationNode { - - private static final int DEFAULT_COST_WEIGHT = 20; - - - private final FieldList solutionSetKeyFields; - - private final GlobalProperties partitionedProperties; - - private final List<OperatorDescriptorDual> dataProperties; - - private SolutionSetNode solutionSetNode; - - private WorksetNode worksetNode; - - private OptimizerNode solutionSetDelta; - - private OptimizerNode nextWorkset; - - private DagConnection solutionSetDeltaRootConnection; - - private DagConnection nextWorksetRootConnection; - - private SingleRootJoiner singleRoot; - - private boolean solutionDeltaImmediatelyAfterSolutionJoin; - - private final int costWeight; - - // -------------------------------------------------------------------------------------------- - - /** - * Creates a new node with a single input for the optimizer plan. - * - * @param iteration The iteration operator that the node represents. - */ - public WorksetIterationNode(DeltaIterationBase<?, ?> iteration) { - super(iteration); - - final int[] ssKeys = iteration.getSolutionSetKeyFields(); - if (ssKeys == null || ssKeys.length == 0) { - throw new CompilerException("Invalid WorksetIteration: No key fields defined for the solution set."); - } - this.solutionSetKeyFields = new FieldList(ssKeys); - this.partitionedProperties = new GlobalProperties(); - this.partitionedProperties.setHashPartitioned(this.solutionSetKeyFields); - - int weight = iteration.getMaximumNumberOfIterations() > 0 ? - iteration.getMaximumNumberOfIterations() : DEFAULT_COST_WEIGHT; - - if (weight > OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT) { - weight = OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT; - } - this.costWeight = weight; - - this.dataProperties = Collections.<OperatorDescriptorDual>singletonList(new WorksetOpDescriptor(this.solutionSetKeyFields)); - } - - // -------------------------------------------------------------------------------------------- - - public DeltaIterationBase<?, ?> getIterationContract() { - return (DeltaIterationBase<?, ?>) getOperator(); - } - - public SolutionSetNode getSolutionSetNode() { - return this.solutionSetNode; - } - - public WorksetNode getWorksetNode() { - return this.worksetNode; - } - - public OptimizerNode getNextWorkset() { - return this.nextWorkset; - } - - public OptimizerNode getSolutionSetDelta() { - return this.solutionSetDelta; - } - - public void setPartialSolution(SolutionSetNode solutionSetNode, WorksetNode worksetNode) { - if (this.solutionSetNode != null || this.worksetNode != null) { - throw new IllegalStateException("Error: Initializing WorksetIterationNode multiple times."); - } - this.solutionSetNode = solutionSetNode; - this.worksetNode = worksetNode; - } - - public void setNextPartialSolution(OptimizerNode solutionSetDelta, OptimizerNode nextWorkset, - ExecutionMode executionMode) { - - // check whether the next partial solution is itself the join with - // the partial solution (so we can potentially do direct updates) - if (solutionSetDelta instanceof TwoInputNode) { - TwoInputNode solutionDeltaTwoInput = (TwoInputNode) solutionSetDelta; - if (solutionDeltaTwoInput.getFirstPredecessorNode() == this.solutionSetNode || - solutionDeltaTwoInput.getSecondPredecessorNode() == this.solutionSetNode) - { - this.solutionDeltaImmediatelyAfterSolutionJoin = true; - } - } - - // there needs to be at least one node in the workset path, so - // if the next workset is equal to the workset, we need to inject a no-op node - if (nextWorkset == worksetNode || nextWorkset instanceof BinaryUnionNode) { - NoOpNode noop = new NoOpNode(); - noop.setDegreeOfParallelism(getParallelism()); - - DagConnection noOpConn = new DagConnection(nextWorkset, noop, executionMode); - noop.setIncomingConnection(noOpConn); - nextWorkset.addOutgoingConnection(noOpConn); - - nextWorkset = noop; - } - - // attach an extra node to the solution set delta for the cases where we need to repartition - UnaryOperatorNode solutionSetDeltaUpdateAux = new UnaryOperatorNode("Solution-Set Delta", getSolutionSetKeyFields(), - new SolutionSetDeltaOperator(getSolutionSetKeyFields())); - solutionSetDeltaUpdateAux.setDegreeOfParallelism(getParallelism()); - - DagConnection conn = new DagConnection(solutionSetDelta, solutionSetDeltaUpdateAux, executionMode); - solutionSetDeltaUpdateAux.setIncomingConnection(conn); - solutionSetDelta.addOutgoingConnection(conn); - - this.solutionSetDelta = solutionSetDeltaUpdateAux; - this.nextWorkset = nextWorkset; - - this.singleRoot = new SingleRootJoiner(); - this.solutionSetDeltaRootConnection = new DagConnection(solutionSetDeltaUpdateAux, - this.singleRoot, executionMode); - - this.nextWorksetRootConnection = new DagConnection(nextWorkset, this.singleRoot, executionMode); - this.singleRoot.setInputs(this.solutionSetDeltaRootConnection, this.nextWorksetRootConnection); - - solutionSetDeltaUpdateAux.addOutgoingConnection(this.solutionSetDeltaRootConnection); - nextWorkset.addOutgoingConnection(this.nextWorksetRootConnection); - } - - public int getCostWeight() { - return this.costWeight; - } - - public TwoInputNode getSingleRootOfStepFunction() { - return this.singleRoot; - } - - public FieldList getSolutionSetKeyFields() { - return this.solutionSetKeyFields; - } - - public OptimizerNode getInitialSolutionSetPredecessorNode() { - return getFirstPredecessorNode(); - } - - public OptimizerNode getInitialWorksetPredecessorNode() { - return getSecondPredecessorNode(); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public String getName() { - return "Workset Iteration"; - } - - @Override - public SemanticProperties getSemanticProperties() { - return new EmptySemanticProperties(); - } - - protected void readStubAnnotations() {} - - @Override - protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { - this.estimatedOutputSize = getFirstPredecessorNode().getEstimatedOutputSize(); - this.estimatedNumRecords = getFirstPredecessorNode().getEstimatedNumRecords(); - } - - // -------------------------------------------------------------------------------------------- - // Properties and Optimization - // -------------------------------------------------------------------------------------------- - - @Override - protected List<OperatorDescriptorDual> getPossibleProperties() { - return this.dataProperties; - } - - @Override - public void computeInterestingPropertiesForInputs(CostEstimator estimator) { - // our own solution (the solution set) is always partitioned and this cannot be adjusted - // depending on what the successor to the workset iteration requests. for that reason, - // we ignore incoming interesting properties. - - // in addition, we need to make 2 interesting property passes, because the root of the step function - // that computes the next workset needs the interesting properties as generated by the - // workset source of the step function. the second pass concerns only the workset path. - // as initial interesting properties, we have the trivial ones for the step function, - // and partitioned on the solution set key for the solution set delta - - RequestedGlobalProperties partitionedProperties = new RequestedGlobalProperties(); - partitionedProperties.setHashPartitioned(this.solutionSetKeyFields); - InterestingProperties partitionedIP = new InterestingProperties(); - partitionedIP.addGlobalProperties(partitionedProperties); - partitionedIP.addLocalProperties(new RequestedLocalProperties()); - - this.nextWorksetRootConnection.setInterestingProperties(new InterestingProperties()); - this.solutionSetDeltaRootConnection.setInterestingProperties(partitionedIP.clone()); - - InterestingPropertyVisitor ipv = new InterestingPropertyVisitor(estimator); - this.nextWorkset.accept(ipv); - this.solutionSetDelta.accept(ipv); - - // take the interesting properties of the partial solution and add them to the root interesting properties - InterestingProperties worksetIntProps = this.worksetNode.getInterestingProperties(); - InterestingProperties intProps = new InterestingProperties(); - intProps.getGlobalProperties().addAll(worksetIntProps.getGlobalProperties()); - intProps.getLocalProperties().addAll(worksetIntProps.getLocalProperties()); - - // clear all interesting properties to prepare the second traversal - this.nextWorksetRootConnection.clearInterestingProperties(); - this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE); - - // 2nd pass - this.nextWorksetRootConnection.setInterestingProperties(intProps); - this.nextWorkset.accept(ipv); - - // now add the interesting properties of the workset to the workset input - final InterestingProperties inProps = this.worksetNode.getInterestingProperties().clone(); - inProps.addGlobalProperties(new RequestedGlobalProperties()); - inProps.addLocalProperties(new RequestedLocalProperties()); - this.input2.setInterestingProperties(inProps); - - // the partial solution must be hash partitioned, so it has only that as interesting properties - this.input1.setInterestingProperties(partitionedIP); - } - - @Override - public void clearInterestingProperties() { - super.clearInterestingProperties(); - - this.nextWorksetRootConnection.clearInterestingProperties(); - this.solutionSetDeltaRootConnection.clearInterestingProperties(); - - this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE); - this.solutionSetDelta.accept(InterestingPropertiesClearer.INSTANCE); - } - - @Override - protected void instantiate(OperatorDescriptorDual operator, Channel solutionSetIn, Channel worksetIn, - List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator, - RequestedGlobalProperties globPropsReqSolutionSet, RequestedGlobalProperties globPropsReqWorkset, - RequestedLocalProperties locPropsReqSolutionSet, RequestedLocalProperties locPropsReqWorkset) - { - // check for pipeline breaking using hash join with build on the solution set side - placePipelineBreakersIfNecessary(DriverStrategy.HYBRIDHASH_BUILD_FIRST, solutionSetIn, worksetIn); - - // NOTES ON THE ENUMERATION OF THE STEP FUNCTION PLANS: - // Whenever we instantiate the iteration, we enumerate new candidates for the step function. - // That way, we make sure we have an appropriate plan for each candidate for the initial partial solution, - // we have a fitting candidate for the step function (often, work is pushed out of the step function). - // Among the candidates of the step function, we keep only those that meet the requested properties of the - // current candidate initial partial solution. That makes sure these properties exist at the beginning of - // every iteration. - - // 1) Because we enumerate multiple times, we may need to clean the cached plans - // before starting another enumeration - this.nextWorkset.accept(PlanCacheCleaner.INSTANCE); - this.solutionSetDelta.accept(PlanCacheCleaner.INSTANCE); - - // 2) Give the partial solution the properties of the current candidate for the initial partial solution - // This concerns currently only the workset. - this.worksetNode.setCandidateProperties(worksetIn.getGlobalProperties(), worksetIn.getLocalProperties(), worksetIn); - this.solutionSetNode.setCandidateProperties(this.partitionedProperties, new LocalProperties(), solutionSetIn); - - final SolutionSetPlanNode sspn = this.solutionSetNode.getCurrentSolutionSetPlanNode(); - final WorksetPlanNode wspn = this.worksetNode.getCurrentWorksetPlanNode(); - - // 3) Get the alternative plans - List<PlanNode> solutionSetDeltaCandidates = this.solutionSetDelta.getAlternativePlans(estimator); - List<PlanNode> worksetCandidates = this.nextWorkset.getAlternativePlans(estimator); - - // 4) Throw away all that are not compatible with the properties currently requested to the - // initial partial solution - - // Make sure that the workset candidates fulfill the input requirements - { - List<PlanNode> newCandidates = new ArrayList<PlanNode>(); - - for (Iterator<PlanNode> planDeleter = worksetCandidates.iterator(); planDeleter.hasNext(); ) { - PlanNode candidate = planDeleter.next(); - - GlobalProperties atEndGlobal = candidate.getGlobalProperties(); - LocalProperties atEndLocal = candidate.getLocalProperties(); - - FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(wspn, - atEndGlobal, atEndLocal); - - if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) { - ; // depends only through broadcast variable on the workset solution - } - else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) { - // attach a no-op node through which we create the properties of the original input - Channel toNoOp = new Channel(candidate); - globPropsReqWorkset.parameterizeChannel(toNoOp, false, - nextWorksetRootConnection.getDataExchangeMode(), false); - locPropsReqWorkset.parameterizeChannel(toNoOp); - - UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties", - FieldList.EMPTY_LIST); - - rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getParallelism()); - - SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode( - rebuildWorksetPropertiesNode, "Rebuild Workset Properties", - toNoOp, DriverStrategy.UNARY_NO_OP); - rebuildWorksetPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), - toNoOp.getLocalProperties()); - estimator.costOperator(rebuildWorksetPropertiesPlanNode); - - GlobalProperties atEndGlobalModified = rebuildWorksetPropertiesPlanNode.getGlobalProperties(); - LocalProperties atEndLocalModified = rebuildWorksetPropertiesPlanNode.getLocalProperties(); - - if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) { - FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet( - wspn, atEndGlobalModified, atEndLocalModified); - if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) { - newCandidates.add(rebuildWorksetPropertiesPlanNode); - } - } - - // remove the original operator and add the modified candidate - planDeleter.remove(); - - } - } - - worksetCandidates.addAll(newCandidates); - } - - if (worksetCandidates.isEmpty()) { - return; - } - - // sanity check the solution set delta - for (PlanNode solutionSetDeltaCandidate : solutionSetDeltaCandidates) { - SingleInputPlanNode candidate = (SingleInputPlanNode) solutionSetDeltaCandidate; - GlobalProperties gp = candidate.getGlobalProperties(); - - if (gp.getPartitioning() != PartitioningProperty.HASH_PARTITIONED || gp.getPartitioningFields() == null || - !gp.getPartitioningFields().equals(this.solutionSetKeyFields)) { - throw new CompilerException("Bug: The solution set delta is not partitioned."); - } - } - - // 5) Create a candidate for the Iteration Node for every remaining plan of the step function. - - final GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(this.solutionSetKeyFields); - gp.addUniqueFieldCombination(this.solutionSetKeyFields); - - LocalProperties lp = LocalProperties.EMPTY.addUniqueFields(this.solutionSetKeyFields); - - // take all combinations of solution set delta and workset plans - for (PlanNode solutionSetCandidate : solutionSetDeltaCandidates) { - for (PlanNode worksetCandidate : worksetCandidates) { - // check whether they have the same operator at their latest branching point - if (this.singleRoot.areBranchCompatible(solutionSetCandidate, worksetCandidate)) { - - SingleInputPlanNode siSolutionDeltaCandidate = (SingleInputPlanNode) solutionSetCandidate; - boolean immediateDeltaUpdate; - - // check whether we need a dedicated solution set delta operator, or whether we can update on the fly - if (siSolutionDeltaCandidate.getInput().getShipStrategy() == ShipStrategyType.FORWARD && - this.solutionDeltaImmediatelyAfterSolutionJoin) - { - // we do not need this extra node. we can make the predecessor the delta - // sanity check the node and connection - if (siSolutionDeltaCandidate.getDriverStrategy() != DriverStrategy.UNARY_NO_OP || - siSolutionDeltaCandidate.getInput().getLocalStrategy() != LocalStrategy.NONE) - { - throw new CompilerException("Invalid Solution set delta node."); - } - - solutionSetCandidate = siSolutionDeltaCandidate.getInput().getSource(); - immediateDeltaUpdate = true; - } else { - // was not partitioned, we need to keep this node. - // mark that we materialize the input - siSolutionDeltaCandidate.getInput().setTempMode(TempMode.PIPELINE_BREAKER); - immediateDeltaUpdate = false; - } - - WorksetIterationPlanNode wsNode = new WorksetIterationPlanNode(this, - "WorksetIteration ("+this.getOperator().getName()+")", solutionSetIn, - worksetIn, sspn, wspn, worksetCandidate, solutionSetCandidate); - wsNode.setImmediateSolutionSetUpdate(immediateDeltaUpdate); - wsNode.initProperties(gp, lp); - target.add(wsNode); - } - } - } - } - - @Override - public void computeUnclosedBranchStack() { - if (this.openBranches != null) { - return; - } - - // IMPORTANT: First compute closed branches from the two inputs - // we need to do this because the runtime iteration head effectively joins - addClosedBranches(getFirstPredecessorNode().closedBranchingNodes); - addClosedBranches(getSecondPredecessorNode().closedBranchingNodes); - - List<UnclosedBranchDescriptor> result1 = getFirstPredecessorNode().getBranchesForParent(getFirstIncomingConnection()); - List<UnclosedBranchDescriptor> result2 = getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection()); - - ArrayList<UnclosedBranchDescriptor> inputsMerged1 = new ArrayList<UnclosedBranchDescriptor>(); - mergeLists(result1, result2, inputsMerged1, true); // this method also sets which branches are joined here (in the head) - - addClosedBranches(getSingleRootOfStepFunction().closedBranchingNodes); - - ArrayList<UnclosedBranchDescriptor> inputsMerged2 = new ArrayList<UnclosedBranchDescriptor>(); - List<UnclosedBranchDescriptor> result3 = getSingleRootOfStepFunction().openBranches; - mergeLists(inputsMerged1, result3, inputsMerged2, true); - - // handle the data flow branching for the broadcast inputs - List<UnclosedBranchDescriptor> result = computeUnclosedBranchStackForBroadcastInputs(inputsMerged2); - - this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result; - } - - // -------------------------------------------------------------------------------------------- - // Iteration Specific Traversals - // -------------------------------------------------------------------------------------------- - - public void acceptForStepFunction(Visitor<OptimizerNode> visitor) { - this.singleRoot.accept(visitor); - } - - // -------------------------------------------------------------------------------------------- - // Utility Classes - // -------------------------------------------------------------------------------------------- - - private static final class WorksetOpDescriptor extends OperatorDescriptorDual { - - private WorksetOpDescriptor(FieldList solutionSetKeys) { - super(solutionSetKeys, null); - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.NONE; - } - - @Override - protected List<GlobalPropertiesPair> createPossibleGlobalProperties() { - RequestedGlobalProperties partitionedGp = new RequestedGlobalProperties(); - partitionedGp.setHashPartitioned(this.keys1); - return Collections.singletonList(new GlobalPropertiesPair(partitionedGp, new RequestedGlobalProperties())); - } - - @Override - protected List<LocalPropertiesPair> createPossibleLocalProperties() { - // all properties are possible - return Collections.singletonList(new LocalPropertiesPair( - new RequestedLocalProperties(), new RequestedLocalProperties())); - } - - @Override - public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2, - GlobalProperties produced1, GlobalProperties produced2) { - return true; - } - - @Override - public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, - LocalProperties produced1, LocalProperties produced2) { - return true; - } - - @Override - public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - throw new UnsupportedOperationException(); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) { - throw new UnsupportedOperationException(); - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { - throw new UnsupportedOperationException(); - } - } - - public static class SingleRootJoiner extends TwoInputNode { - - SingleRootJoiner() { - super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo())); - - setDegreeOfParallelism(1); - } - - public void setInputs(DagConnection input1, DagConnection input2) { - this.input1 = input1; - this.input2 = input2; - } - - @Override - public String getName() { - return "Internal Utility Node"; - } - - @Override - protected List<OperatorDescriptorDual> getPossibleProperties() { - return Collections.emptyList(); - } - - @Override - protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { - // no estimates are needed here - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java deleted file mode 100644 index 3b05aba..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.dag; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder; -import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.PlanNode; -import org.apache.flink.optimizer.plan.WorksetPlanNode; - -/** - * The optimizer's internal representation of the partial solution that is input to a bulk iteration. - */ -public class WorksetNode extends AbstractPartialSolutionNode { - - private final WorksetIterationNode iterationNode; - - - public WorksetNode(WorksetPlaceHolder<?> psph, WorksetIterationNode iterationNode) { - super(psph); - this.iterationNode = iterationNode; - } - - // -------------------------------------------------------------------------------------------- - - public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) { - if (this.cachedPlans != null) { - throw new IllegalStateException(); - } else { - WorksetPlanNode wspn = new WorksetPlanNode(this, "Workset ("+this.getOperator().getName()+")", gProps, lProps, initialInput); - this.cachedPlans = Collections.<PlanNode>singletonList(wspn); - } - } - - public WorksetPlanNode getCurrentWorksetPlanNode() { - if (this.cachedPlans != null) { - return (WorksetPlanNode) this.cachedPlans.get(0); - } else { - throw new IllegalStateException(); - } - } - - public WorksetIterationNode getIterationNode() { - return this.iterationNode; - } - - @Override - public void computeOutputEstimates(DataStatistics statistics) { - copyEstimates(this.iterationNode.getInitialWorksetPredecessorNode()); - } - - // -------------------------------------------------------------------------------------------- - - /** - * Gets the contract object for this data source node. - * - * @return The contract. - */ - @Override - public WorksetPlaceHolder<?> getOperator() { - return (WorksetPlaceHolder<?>) super.getOperator(); - } - - @Override - public String getName() { - return "Workset"; - } - - @Override - public void computeUnclosedBranchStack() { - if (this.openBranches != null) { - return; - } - - DagConnection worksetInput = this.iterationNode.getSecondIncomingConnection(); - OptimizerNode worksetSource = worksetInput.getSource(); - - addClosedBranches(worksetSource.closedBranchingNodes); - List<UnclosedBranchDescriptor> fromInput = worksetSource.getBranchesForParent(worksetInput); - this.openBranches = (fromInput == null || fromInput.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : fromInput; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java deleted file mode 100644 index 57ba29d..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java +++ /dev/null @@ -1,500 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.dataproperties; - -import java.util.HashSet; -import java.util.Set; - -import org.apache.flink.api.common.ExecutionMode; -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.common.operators.Ordering; -import org.apache.flink.api.common.operators.SemanticProperties; -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.util.Utils; -import org.apache.flink.runtime.io.network.DataExchangeMode; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class represents global properties of the data at a certain point in the plan. - * Global properties are properties that describe data across different partitions, such as - * whether the data is hash partitioned, range partitioned, replicated, etc. - */ -public class GlobalProperties implements Cloneable { - - public static final Logger LOG = LoggerFactory.getLogger(GlobalProperties.class); - - private PartitioningProperty partitioning; // the type partitioning - - private FieldList partitioningFields; // the fields which are partitioned - - private Ordering ordering; // order of the partitioned fields, if it is an ordered (range) range partitioning - - private Set<FieldSet> uniqueFieldCombinations; - - private Partitioner<?> customPartitioner; - - // -------------------------------------------------------------------------------------------- - - /** - * Initializes the global properties with no partitioning. - */ - public GlobalProperties() { - this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; - } - - // -------------------------------------------------------------------------------------------- - - /** - * Sets this global properties to represent a hash partitioning. - * - * @param partitionedFields The key fields on which the data is hash partitioned. - */ - public void setHashPartitioned(FieldList partitionedFields) { - if (partitionedFields == null) { - throw new NullPointerException(); - } - - this.partitioning = PartitioningProperty.HASH_PARTITIONED; - this.partitioningFields = partitionedFields; - this.ordering = null; - } - - - public void setRangePartitioned(Ordering ordering) { - if (ordering == null) { - throw new NullPointerException(); - } - - this.partitioning = PartitioningProperty.RANGE_PARTITIONED; - this.ordering = ordering; - this.partitioningFields = ordering.getInvolvedIndexes(); - } - - public void setAnyPartitioning(FieldList partitionedFields) { - if (partitionedFields == null) { - throw new NullPointerException(); - } - - this.partitioning = PartitioningProperty.ANY_PARTITIONING; - this.partitioningFields = partitionedFields; - this.ordering = null; - } - - public void setRandomPartitioned() { - this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; - this.partitioningFields = null; - this.ordering = null; - } - - public void setFullyReplicated() { - this.partitioning = PartitioningProperty.FULL_REPLICATION; - this.partitioningFields = null; - this.ordering = null; - } - - public void setForcedRebalanced() { - this.partitioning = PartitioningProperty.FORCED_REBALANCED; - this.partitioningFields = null; - this.ordering = null; - } - - public void setCustomPartitioned(FieldList partitionedFields, Partitioner<?> partitioner) { - if (partitionedFields == null || partitioner == null) { - throw new NullPointerException(); - } - - this.partitioning = PartitioningProperty.CUSTOM_PARTITIONING; - this.partitioningFields = partitionedFields; - this.ordering = null; - this.customPartitioner = partitioner; - } - - public void addUniqueFieldCombination(FieldSet fields) { - if (fields == null) { - return; - } - if (this.uniqueFieldCombinations == null) { - this.uniqueFieldCombinations = new HashSet<FieldSet>(); - } - this.uniqueFieldCombinations.add(fields); - } - - public void clearUniqueFieldCombinations() { - if (this.uniqueFieldCombinations != null) { - this.uniqueFieldCombinations = null; - } - } - - public Set<FieldSet> getUniqueFieldCombination() { - return this.uniqueFieldCombinations; - } - - public FieldList getPartitioningFields() { - return this.partitioningFields; - } - - public Ordering getPartitioningOrdering() { - return this.ordering; - } - - public PartitioningProperty getPartitioning() { - return this.partitioning; - } - - public Partitioner<?> getCustomPartitioner() { - return this.customPartitioner; - } - - // -------------------------------------------------------------------------------------------- - - public boolean isPartitionedOnFields(FieldSet fields) { - if (this.partitioning.isPartitionedOnKey() && fields.isValidSubset(this.partitioningFields)) { - return true; - } else if (this.uniqueFieldCombinations != null) { - for (FieldSet set : this.uniqueFieldCombinations) { - if (fields.isValidSubset(set)) { - return true; - } - } - return false; - } else { - return false; - } - } - - public boolean isExactlyPartitionedOnFields(FieldList fields) { - return this.partitioning.isPartitionedOnKey() && fields.isExactMatch(this.partitioningFields); - } - - public boolean matchesOrderedPartitioning(Ordering o) { - if (this.partitioning == PartitioningProperty.RANGE_PARTITIONED) { - if (this.ordering.getNumberOfFields() > o.getNumberOfFields()) { - return false; - } - - for (int i = 0; i < this.ordering.getNumberOfFields(); i++) { - if (this.ordering.getFieldNumber(i) != o.getFieldNumber(i)) { - return false; - } - - // if this one request no order, everything is good - final Order oo = o.getOrder(i); - final Order to = this.ordering.getOrder(i); - if (oo != Order.NONE) { - if (oo == Order.ANY) { - // if any order is requested, any not NONE order is good - if (to == Order.NONE) { - return false; - } - } else if (oo != to) { - // the orders must be equal - return false; - } - } - } - return true; - } else { - return false; - } - } - - public boolean isFullyReplicated() { - return this.partitioning == PartitioningProperty.FULL_REPLICATION; - } - - /** - * Checks, if the properties in this object are trivial, i.e. only standard values. - */ - public boolean isTrivial() { - return partitioning == PartitioningProperty.RANDOM_PARTITIONED; - } - - /** - * This method resets the properties to a state where no properties are given. - */ - public void reset() { - this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; - this.ordering = null; - this.partitioningFields = null; - } - - /** - * Filters these GlobalProperties by the fields that are forwarded to the output - * as described by the SemanticProperties. - * - * @param props The semantic properties holding information about forwarded fields. - * @param input The index of the input. - * @return The filtered GlobalProperties - */ - public GlobalProperties filterBySemanticProperties(SemanticProperties props, int input) { - - if (props == null) { - throw new NullPointerException("SemanticProperties may not be null."); - } - - GlobalProperties gp = new GlobalProperties(); - - // filter partitioning - switch(this.partitioning) { - case RANGE_PARTITIONED: - // check if ordering is preserved - Ordering newOrdering = new Ordering(); - for (int i = 0; i < this.ordering.getInvolvedIndexes().size(); i++) { - int sourceField = this.ordering.getInvolvedIndexes().get(i); - FieldSet targetField = props.getForwardingTargetFields(input, sourceField); - - if (targetField == null || targetField.size() == 0) { - // partitioning is destroyed - newOrdering = null; - break; - } else { - // use any field of target fields for now. We should use something like field equivalence sets in the future. - if(targetField.size() > 1) { - LOG.warn("Found that a field is forwarded to more than one target field in " + - "semantic forwarded field information. Will only use the field with the lowest index."); - } - newOrdering.appendOrdering(targetField.toArray()[0], this.ordering.getType(i), this.ordering.getOrder(i)); - } - } - if(newOrdering != null) { - gp.partitioning = PartitioningProperty.RANGE_PARTITIONED; - gp.ordering = newOrdering; - gp.partitioningFields = newOrdering.getInvolvedIndexes(); - } - break; - case HASH_PARTITIONED: - case ANY_PARTITIONING: - case CUSTOM_PARTITIONING: - FieldList newPartitioningFields = new FieldList(); - for (int sourceField : this.partitioningFields) { - FieldSet targetField = props.getForwardingTargetFields(input, sourceField); - - if (targetField == null || targetField.size() == 0) { - newPartitioningFields = null; - break; - } else { - // use any field of target fields for now. We should use something like field equivalence sets in the future. - if(targetField.size() > 1) { - LOG.warn("Found that a field is forwarded to more than one target field in " + - "semantic forwarded field information. Will only use the field with the lowest index."); - } - newPartitioningFields = newPartitioningFields.addField(targetField.toArray()[0]); - } - } - if(newPartitioningFields != null) { - gp.partitioning = this.partitioning; - gp.partitioningFields = newPartitioningFields; - gp.customPartitioner = this.customPartitioner; - } - break; - case FORCED_REBALANCED: - case FULL_REPLICATION: - case RANDOM_PARTITIONED: - gp.partitioning = this.partitioning; - break; - default: - throw new RuntimeException("Unknown partitioning type."); - } - - // filter unique field combinations - if (this.uniqueFieldCombinations != null) { - Set<FieldSet> newUniqueFieldCombinations = new HashSet<FieldSet>(); - for (FieldSet fieldCombo : this.uniqueFieldCombinations) { - FieldSet newFieldCombo = new FieldSet(); - for (Integer sourceField : fieldCombo) { - FieldSet targetField = props.getForwardingTargetFields(input, sourceField); - - if (targetField == null || targetField.size() == 0) { - newFieldCombo = null; - break; - } else { - // use any field of target fields for now. We should use something like field equivalence sets in the future. - if(targetField.size() > 1) { - LOG.warn("Found that a field is forwarded to more than one target field in " + - "semantic forwarded field information. Will only use the field with the lowest index."); - } - newFieldCombo = newFieldCombo.addField(targetField.toArray()[0]); - } - } - if (newFieldCombo != null) { - newUniqueFieldCombinations.add(newFieldCombo); - } - } - if(!newUniqueFieldCombinations.isEmpty()) { - gp.uniqueFieldCombinations = newUniqueFieldCombinations; - } - } - - return gp; - } - - - public void parameterizeChannel(Channel channel, boolean globalDopChange, - ExecutionMode exchangeMode, boolean breakPipeline) { - - ShipStrategyType shipType; - FieldList partitionKeys; - boolean[] sortDirection; - Partitioner<?> partitioner; - - switch (this.partitioning) { - case RANDOM_PARTITIONED: - shipType = globalDopChange ? ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD; - partitionKeys = null; - sortDirection = null; - partitioner = null; - break; - - case FULL_REPLICATION: - shipType = ShipStrategyType.BROADCAST; - partitionKeys = null; - sortDirection = null; - partitioner = null; - break; - - case ANY_PARTITIONING: - case HASH_PARTITIONED: - shipType = ShipStrategyType.PARTITION_HASH; - partitionKeys = Utils.createOrderedFromSet(this.partitioningFields); - sortDirection = null; - partitioner = null; - break; - - case RANGE_PARTITIONED: - shipType = ShipStrategyType.PARTITION_RANGE; - partitionKeys = this.ordering.getInvolvedIndexes(); - sortDirection = this.ordering.getFieldSortDirections(); - partitioner = null; - break; - - case FORCED_REBALANCED: - shipType = ShipStrategyType.PARTITION_RANDOM; - partitionKeys = null; - sortDirection = null; - partitioner = null; - break; - - case CUSTOM_PARTITIONING: - shipType = ShipStrategyType.PARTITION_CUSTOM; - partitionKeys = this.partitioningFields; - sortDirection = null; - partitioner = this.customPartitioner; - break; - - default: - throw new CompilerException("Unsupported partitioning strategy"); - } - - DataExchangeMode exMode = DataExchangeMode.select(exchangeMode, shipType, breakPipeline); - channel.setShipStrategy(shipType, partitionKeys, sortDirection, partitioner, exMode); - } - - // ------------------------------------------------------------------------ - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((partitioning == null) ? 0 : partitioning.ordinal()); - result = prime * result + ((partitioningFields == null) ? 0 : partitioningFields.hashCode()); - result = prime * result + ((ordering == null) ? 0 : ordering.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (obj != null && obj instanceof GlobalProperties) { - final GlobalProperties other = (GlobalProperties) obj; - return (this.partitioning == other.partitioning) - && (this.ordering == other.ordering || (this.ordering != null && this.ordering.equals(other.ordering))) - && (this.partitioningFields == other.partitioningFields || - (this.partitioningFields != null && this.partitioningFields.equals(other.partitioningFields))) - && (this.uniqueFieldCombinations == other.uniqueFieldCombinations || - (this.uniqueFieldCombinations != null && this.uniqueFieldCombinations.equals(other.uniqueFieldCombinations))); - } else { - return false; - } - } - - @Override - public String toString() { - final StringBuilder bld = new StringBuilder( - "GlobalProperties [partitioning=" + partitioning + - (this.partitioningFields == null ? "" : ", on fields " + this.partitioningFields) + - (this.ordering == null ? "" : ", with ordering " + this.ordering)); - - if (this.uniqueFieldCombinations == null) { - bld.append(']'); - } else { - bld.append(" - Unique field groups: "); - bld.append(this.uniqueFieldCombinations); - bld.append(']'); - } - return bld.toString(); - } - - @Override - public GlobalProperties clone() { - final GlobalProperties newProps = new GlobalProperties(); - newProps.partitioning = this.partitioning; - newProps.partitioningFields = this.partitioningFields; - newProps.ordering = this.ordering; - newProps.customPartitioner = this.customPartitioner; - newProps.uniqueFieldCombinations = this.uniqueFieldCombinations == null ? null : new HashSet<FieldSet>(this.uniqueFieldCombinations); - return newProps; - } - - // -------------------------------------------------------------------------------------------- - - public static GlobalProperties combine(GlobalProperties gp1, GlobalProperties gp2) { - if (gp1.isFullyReplicated()) { - if (gp2.isFullyReplicated()) { - return new GlobalProperties(); - } else { - return gp2; - } - } else if (gp2.isFullyReplicated()) { - return gp1; - } else if (gp1.ordering != null) { - return gp1; - } else if (gp2.ordering != null) { - return gp2; - } else if (gp1.partitioningFields != null) { - return gp1; - } else if (gp2.partitioningFields != null) { - return gp2; - } else if (gp1.uniqueFieldCombinations != null) { - return gp1; - } else if (gp2.uniqueFieldCombinations != null) { - return gp2; - } else if (gp1.getPartitioning().isPartitioned()) { - return gp1; - } else if (gp2.getPartitioning().isPartitioned()) { - return gp2; - } else { - return gp1; - } - } -}