http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java new file mode 100644 index 0000000..df05b64 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.plan; + +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; + +import java.util.Collections; +import java.util.HashMap; + +import org.apache.flink.optimizer.costs.Costs; +import org.apache.flink.optimizer.dag.BulkPartialSolutionNode; +import org.apache.flink.optimizer.dag.OptimizerNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.runtime.operators.DamBehavior; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.util.Visitor; + +/** + * Plan candidate node for partial solution of a bulk iteration. + */ +public class BulkPartialSolutionPlanNode extends PlanNode { + + private static final Costs NO_COSTS = new Costs(); + + private BulkIterationPlanNode containingIterationNode; + + private Channel initialInput; + + public Object postPassHelper; + + + public BulkPartialSolutionPlanNode(BulkPartialSolutionNode template, String nodeName, + GlobalProperties gProps, LocalProperties lProps, + Channel initialInput) + { + super(template, nodeName, DriverStrategy.NONE); + + this.globalProps = gProps; + this.localProps = lProps; + this.initialInput = initialInput; + + // the partial solution does not cost anything + this.nodeCosts = NO_COSTS; + this.cumulativeCosts = NO_COSTS; + + if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) { + if (this.branchPlan == null) { + this.branchPlan = new HashMap<OptimizerNode, PlanNode>(); + } + + this.branchPlan.putAll(initialInput.getSource().branchPlan); + } + } + + // -------------------------------------------------------------------------------------------- + + public BulkPartialSolutionNode getPartialSolutionNode() { + return (BulkPartialSolutionNode) this.template; + } + + public BulkIterationPlanNode getContainingIterationNode() { + return this.containingIterationNode; + } + + public void setContainingIterationNode(BulkIterationPlanNode containingIterationNode) { + this.containingIterationNode = containingIterationNode; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public void accept(Visitor<PlanNode> visitor) { + if (visitor.preVisit(this)) { + visitor.postVisit(this); + } + } + + @Override + public Iterable<PlanNode> getPredecessors() { + return Collections.<PlanNode>emptyList(); + } + + @Override + public Iterable<Channel> getInputs() { + return Collections.<Channel>emptyList(); + } + + @Override + public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { + if (source == this) { + return FOUND_SOURCE; + } + SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source); + if (res == FOUND_SOURCE_AND_DAM) { + return FOUND_SOURCE_AND_DAM; + } + else if (res == FOUND_SOURCE) { + return (this.initialInput.getLocalStrategy().dams() || + this.initialInput.getTempMode().breaksPipeline() || + getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ? + FOUND_SOURCE_AND_DAM : FOUND_SOURCE; + } + else { + return NOT_FOUND; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java new file mode 100644 index 0000000..875d1c3 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.plan; + +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.typeutils.TypeComparatorFactory; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.dag.EstimateProvider; +import org.apache.flink.optimizer.dag.TempMode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plandump.DumpableConnection; +import org.apache.flink.optimizer.util.Utils; +import org.apache.flink.runtime.io.network.DataExchangeMode; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.runtime.operators.util.LocalStrategy; + +/** + * A Channel represents the result produced by an operator and the data exchange + * before the consumption by the target operator. + * + * The channel defines and tracks various properties and characteristics of the + * data set and data exchange. + * + * Data set characteristics: + * <ul> + * <li>The "global properties" of the data, i.e., how the data is distributed across + * partitions</li> + * <li>The "required global properties" of the data, i.e., the global properties that, if absent, + * would cause the program to return a wrong result.</li> + * <li>The "local properties" of the data, i.e., how the data is organized within a partition</li> + * <li>The "required local properties" of the data, i.e., the local properties that, if absent, + * would cause the program to return a wrong result.</li> + * </ul> + * + * Data exchange parameters: + * <ul> + * <li>The "ship strategy", i.e., whether to forward the data, shuffle it, broadcast it, ...</li> + * <li>The "ship keys", which are the positions of the key fields in the exchanged records.</li> + * <li>The "data exchange mode", which defines whether to pipeline or batch the exchange</li> + * <li>Several more...</li> + * </ul> + */ +public class Channel implements EstimateProvider, Cloneable, DumpableConnection<PlanNode> { + + private PlanNode source; + + private PlanNode target; + + private ShipStrategyType shipStrategy = ShipStrategyType.NONE; + + private DataExchangeMode dataExchangeMode; + + private LocalStrategy localStrategy = LocalStrategy.NONE; + + private FieldList shipKeys; + + private FieldList localKeys; + + private boolean[] shipSortOrder; + + private boolean[] localSortOrder; + + private RequestedGlobalProperties requiredGlobalProps; + + private RequestedLocalProperties requiredLocalProps; + + private GlobalProperties globalProps; + + private LocalProperties localProps; + + private TypeSerializerFactory<?> serializer; + + private TypeComparatorFactory<?> shipStrategyComparator; + + private TypeComparatorFactory<?> localStrategyComparator; + + private DataDistribution dataDistribution; + + private Partitioner<?> partitioner; + + private TempMode tempMode; + + private double relativeTempMemory; + + private double relativeMemoryLocalStrategy; + + private int replicationFactor = 1; + + // -------------------------------------------------------------------------------------------- + + public Channel(PlanNode sourceNode) { + this(sourceNode, null); + } + + public Channel(PlanNode sourceNode, TempMode tempMode) { + this.source = sourceNode; + this.tempMode = (tempMode == null ? TempMode.NONE : tempMode); + } + + // -------------------------------------------------------------------------------------------- + // Accessors + // -------------------------------------------------------------------------------------------- + + /** + * Gets the source of this Channel. + * + * @return The source. + */ + @Override + public PlanNode getSource() { + return this.source; + } + + /** + * Sets the target of this Channel. + * + * @param target The target. + */ + public void setTarget(PlanNode target) { + this.target = target; + } + + /** + * Gets the target of this Channel. + * + * @return The target. + */ + public PlanNode getTarget() { + return this.target; + } + + public void setShipStrategy(ShipStrategyType strategy, DataExchangeMode dataExchangeMode) { + setShipStrategy(strategy, null, null, null, dataExchangeMode); + } + + public void setShipStrategy(ShipStrategyType strategy, FieldList keys, DataExchangeMode dataExchangeMode) { + setShipStrategy(strategy, keys, null, null, dataExchangeMode); + } + + public void setShipStrategy(ShipStrategyType strategy, FieldList keys, + boolean[] sortDirection, DataExchangeMode dataExchangeMode) { + setShipStrategy(strategy, keys, sortDirection, null, dataExchangeMode); + } + + public void setShipStrategy(ShipStrategyType strategy, FieldList keys, + Partitioner<?> partitioner, DataExchangeMode dataExchangeMode) { + setShipStrategy(strategy, keys, null, partitioner, dataExchangeMode); + } + + public void setShipStrategy(ShipStrategyType strategy, FieldList keys, + boolean[] sortDirection, Partitioner<?> partitioner, + DataExchangeMode dataExchangeMode) { + this.shipStrategy = strategy; + this.shipKeys = keys; + this.shipSortOrder = sortDirection; + this.partitioner = partitioner; + this.dataExchangeMode = dataExchangeMode; + this.globalProps = null; // reset the global properties + } + + /** + * Gets the data exchange mode (batch / streaming) to use for the data + * exchange of this channel. + * + * @return The data exchange mode of this channel. + */ + public DataExchangeMode getDataExchangeMode() { + return dataExchangeMode; + } + + public ShipStrategyType getShipStrategy() { + return this.shipStrategy; + } + + public FieldList getShipStrategyKeys() { + return this.shipKeys; + } + + public boolean[] getShipStrategySortOrder() { + return this.shipSortOrder; + } + + public void setLocalStrategy(LocalStrategy strategy) { + setLocalStrategy(strategy, null, null); + } + + public void setLocalStrategy(LocalStrategy strategy, FieldList keys, boolean[] sortDirection) { + this.localStrategy = strategy; + this.localKeys = keys; + this.localSortOrder = sortDirection; + this.localProps = null; // reset the local properties + } + + public LocalStrategy getLocalStrategy() { + return this.localStrategy; + } + + public FieldList getLocalStrategyKeys() { + return this.localKeys; + } + + public boolean[] getLocalStrategySortOrder() { + return this.localSortOrder; + } + + public void setDataDistribution(DataDistribution dataDistribution) { + this.dataDistribution = dataDistribution; + } + + public DataDistribution getDataDistribution() { + return this.dataDistribution; + } + + public Partitioner<?> getPartitioner() { + return partitioner; + } + + public TempMode getTempMode() { + return this.tempMode; + } + + /** + * Sets the temp mode of the connection. + * + * @param tempMode + * The temp mode of the connection. + */ + public void setTempMode(TempMode tempMode) { + this.tempMode = tempMode; + } + + /** + * Gets the memory for materializing the channel's result from this Channel. + * + * @return The temp memory. + */ + public double getRelativeTempMemory() { + return this.relativeTempMemory; + } + + /** + * Sets the memory for materializing the channel's result from this Channel. + * + * @param relativeTempMemory The memory for materialization. + */ + public void setRelativeTempMemory(double relativeTempMemory) { + this.relativeTempMemory = relativeTempMemory; + } + + /** + * Sets the replication factor of the connection. + * + * @param factor The replication factor of the connection. + */ + public void setReplicationFactor(int factor) { + this.replicationFactor = factor; + } + + /** + * Returns the replication factor of the connection. + * + * @return The replication factor of the connection. + */ + public int getReplicationFactor() { + return this.replicationFactor; + } + + /** + * Gets the serializer from this Channel. + * + * @return The serializer. + */ + public TypeSerializerFactory<?> getSerializer() { + return serializer; + } + + /** + * Sets the serializer for this Channel. + * + * @param serializer The serializer to set. + */ + public void setSerializer(TypeSerializerFactory<?> serializer) { + this.serializer = serializer; + } + + /** + * Gets the ship strategy comparator from this Channel. + * + * @return The ship strategy comparator. + */ + public TypeComparatorFactory<?> getShipStrategyComparator() { + return shipStrategyComparator; + } + + /** + * Sets the ship strategy comparator for this Channel. + * + * @param shipStrategyComparator The ship strategy comparator to set. + */ + public void setShipStrategyComparator(TypeComparatorFactory<?> shipStrategyComparator) { + this.shipStrategyComparator = shipStrategyComparator; + } + + /** + * Gets the local strategy comparator from this Channel. + * + * @return The local strategy comparator. + */ + public TypeComparatorFactory<?> getLocalStrategyComparator() { + return localStrategyComparator; + } + + /** + * Sets the local strategy comparator for this Channel. + * + * @param localStrategyComparator The local strategy comparator to set. + */ + public void setLocalStrategyComparator(TypeComparatorFactory<?> localStrategyComparator) { + this.localStrategyComparator = localStrategyComparator; + } + + public double getRelativeMemoryLocalStrategy() { + return relativeMemoryLocalStrategy; + } + + public void setRelativeMemoryLocalStrategy(double relativeMemoryLocalStrategy) { + this.relativeMemoryLocalStrategy = relativeMemoryLocalStrategy; + } + + public boolean isOnDynamicPath() { + return this.source.isOnDynamicPath(); + } + + public int getCostWeight() { + return this.source.getCostWeight(); + } + + // -------------------------------------------------------------------------------------------- + // Statistic Estimates + // -------------------------------------------------------------------------------------------- + + + @Override + public long getEstimatedOutputSize() { + long estimate = this.source.template.getEstimatedOutputSize(); + return estimate < 0 ? estimate : estimate * this.replicationFactor; + } + + @Override + public long getEstimatedNumRecords() { + long estimate = this.source.template.getEstimatedNumRecords(); + return estimate < 0 ? estimate : estimate * this.replicationFactor; + } + + @Override + public float getEstimatedAvgWidthPerOutputRecord() { + return this.source.template.getEstimatedAvgWidthPerOutputRecord(); + } + + // -------------------------------------------------------------------------------------------- + // Data Property Handling + // -------------------------------------------------------------------------------------------- + + + public RequestedGlobalProperties getRequiredGlobalProps() { + return requiredGlobalProps; + } + + public void setRequiredGlobalProps(RequestedGlobalProperties requiredGlobalProps) { + this.requiredGlobalProps = requiredGlobalProps; + } + + public RequestedLocalProperties getRequiredLocalProps() { + return requiredLocalProps; + } + + public void setRequiredLocalProps(RequestedLocalProperties requiredLocalProps) { + this.requiredLocalProps = requiredLocalProps; + } + + public GlobalProperties getGlobalProperties() { + if (this.globalProps == null) { + this.globalProps = this.source.getGlobalProperties().clone(); + switch (this.shipStrategy) { + case BROADCAST: + this.globalProps.clearUniqueFieldCombinations(); + this.globalProps.setFullyReplicated(); + break; + case PARTITION_HASH: + this.globalProps.setHashPartitioned(this.shipKeys); + break; + case PARTITION_RANGE: + this.globalProps.setRangePartitioned(Utils.createOrdering(this.shipKeys, this.shipSortOrder)); + break; + case FORWARD: + break; + case PARTITION_RANDOM: + this.globalProps.reset(); + break; + case PARTITION_FORCED_REBALANCE: + this.globalProps.setForcedRebalanced(); + break; + case PARTITION_CUSTOM: + this.globalProps.setCustomPartitioned(this.shipKeys, this.partitioner); + break; + case NONE: + throw new CompilerException("Cannot produce GlobalProperties before ship strategy is set."); + } + } + + return this.globalProps; + } + + public LocalProperties getLocalProperties() { + if (this.localProps == null) { + computeLocalPropertiesAfterShippingOnly(); + switch (this.localStrategy) { + case NONE: + break; + case SORT: + case COMBININGSORT: + this.localProps = LocalProperties.forOrdering(Utils.createOrdering(this.localKeys, this.localSortOrder)); + break; + default: + throw new CompilerException("Unsupported local strategy for channel."); + } + } + + return this.localProps; + } + + private void computeLocalPropertiesAfterShippingOnly() { + switch (this.shipStrategy) { + case BROADCAST: + case PARTITION_HASH: + case PARTITION_CUSTOM: + case PARTITION_RANGE: + case PARTITION_RANDOM: + case PARTITION_FORCED_REBALANCE: + this.localProps = new LocalProperties(); + break; + case FORWARD: + this.localProps = this.source.getLocalProperties(); + break; + case NONE: + throw new CompilerException("ShipStrategy has not yet been set."); + default: + throw new CompilerException("Unknown ShipStrategy."); + } + } + + public void adjustGlobalPropertiesForFullParallelismChange() { + if (this.shipStrategy == null || this.shipStrategy == ShipStrategyType.NONE) { + throw new IllegalStateException("Cannot adjust channel for degree of parallelism " + + "change before the ship strategy is set."); + } + + // make sure the properties are acquired + if (this.globalProps == null) { + getGlobalProperties(); + } + + // some strategies globally reestablish properties + switch (this.shipStrategy) { + case FORWARD: + throw new CompilerException("Cannot use FORWARD strategy between operations " + + "with different number of parallel instances."); + case NONE: // excluded by sanity check. left here for verification check completion + case BROADCAST: + case PARTITION_HASH: + case PARTITION_RANGE: + case PARTITION_RANDOM: + case PARTITION_FORCED_REBALANCE: + case PARTITION_CUSTOM: + return; + } + throw new CompilerException("Unrecognized Ship Strategy Type: " + this.shipStrategy); + } + + // -------------------------------------------------------------------------------------------- + + /** + * Utility method used while swapping binary union nodes for n-ary union nodes. + */ + public void swapUnionNodes(PlanNode newUnionNode) { + if (!(this.source instanceof BinaryUnionPlanNode)) { + throw new IllegalStateException(); + } else { + this.source = newUnionNode; + } + } + + // -------------------------------------------------------------------------------------------- + + public int getMaxDepth() { + return this.source.getOptimizerNode().getMaxDepth() + 1; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public String toString() { + return "Channel (" + this.source + (this.target == null ? ')' : ") -> (" + this.target + ')') + + '[' + this.shipStrategy + "] [" + this.localStrategy + "] " + + (this.tempMode == null || this.tempMode == TempMode.NONE ? "{NO-TEMP}" : this.tempMode); + } + + @Override + public Channel clone() { + try { + return (Channel) super.clone(); + } catch (CloneNotSupportedException cnsex) { + throw new RuntimeException(cnsex); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java new file mode 100644 index 0000000..01c56dd --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.plan; + +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.typeutils.TypeComparatorFactory; +import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; +import org.apache.flink.optimizer.dag.OptimizerNode; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.runtime.operators.DamBehavior; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.util.Visitor; + +/** + * + */ +public class DualInputPlanNode extends PlanNode { + + protected final Channel input1; + protected final Channel input2; + + protected final FieldList keys1; + protected final FieldList keys2; + + protected final boolean[] sortOrders; + + private TypeComparatorFactory<?> comparator1; + private TypeComparatorFactory<?> comparator2; + private TypePairComparatorFactory<?, ?> pairComparator; + + public Object postPassHelper1; + public Object postPassHelper2; + + // -------------------------------------------------------------------------------------------- + + public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2, DriverStrategy diverStrategy) { + this(template, nodeName, input1, input2, diverStrategy, null, null, null); + } + + public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2, + DriverStrategy diverStrategy, FieldList driverKeyFields1, FieldList driverKeyFields2) + { + this(template, nodeName, input1, input2, diverStrategy, driverKeyFields1, driverKeyFields2, + SingleInputPlanNode.getTrueArray(driverKeyFields1.size())); + } + + public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2, DriverStrategy diverStrategy, + FieldList driverKeyFields1, FieldList driverKeyFields2, boolean[] driverSortOrders) + { + super(template, nodeName, diverStrategy); + this.input1 = input1; + this.input2 = input2; + this.keys1 = driverKeyFields1; + this.keys2 = driverKeyFields2; + this.sortOrders = driverSortOrders; + + if (this.input1.getShipStrategy() == ShipStrategyType.BROADCAST) { + this.input1.setReplicationFactor(getParallelism()); + } + if (this.input2.getShipStrategy() == ShipStrategyType.BROADCAST) { + this.input2.setReplicationFactor(getParallelism()); + } + + mergeBranchPlanMaps(input1.getSource(), input2.getSource()); + } + + // -------------------------------------------------------------------------------------------- + + public TwoInputNode getTwoInputNode() { + if (this.template instanceof TwoInputNode) { + return (TwoInputNode) this.template; + } else { + throw new RuntimeException(); + } + } + + public FieldList getKeysForInput1() { + return this.keys1; + } + + public FieldList getKeysForInput2() { + return this.keys2; + } + + public boolean[] getSortOrders() { + return this.sortOrders; + } + + public TypeComparatorFactory<?> getComparator1() { + return this.comparator1; + } + + public TypeComparatorFactory<?> getComparator2() { + return this.comparator2; + } + + public void setComparator1(TypeComparatorFactory<?> comparator) { + this.comparator1 = comparator; + } + + public void setComparator2(TypeComparatorFactory<?> comparator) { + this.comparator2 = comparator; + } + + public TypePairComparatorFactory<?, ?> getPairComparator() { + return this.pairComparator; + } + + public void setPairComparator(TypePairComparatorFactory<?, ?> comparator) { + this.pairComparator = comparator; + } + + /** + * Gets the first input channel to this node. + * + * @return The first input channel to this node. + */ + public Channel getInput1() { + return this.input1; + } + + /** + * Gets the second input channel to this node. + * + * @return The second input channel to this node. + */ + public Channel getInput2() { + return this.input2; + } + + // -------------------------------------------------------------------------------------------- + + + @Override + public void accept(Visitor<PlanNode> visitor) { + if (visitor.preVisit(this)) { + this.input1.getSource().accept(visitor); + this.input2.getSource().accept(visitor); + + for (Channel broadcastInput : getBroadcastInputs()) { + broadcastInput.getSource().accept(visitor); + } + + visitor.postVisit(this); + } + } + + + @Override + public Iterable<PlanNode> getPredecessors() { + if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) { + return Arrays.asList(this.input1.getSource(), this.input2.getSource()); + } else { + List<PlanNode> preds = new ArrayList<PlanNode>(); + + preds.add(input1.getSource()); + preds.add(input2.getSource()); + + for (Channel c : getBroadcastInputs()) { + preds.add(c.getSource()); + } + + return preds; + } + } + + @Override + public Iterable<Channel> getInputs() { + return Arrays.asList(this.input1, this.input2); + } + + + @Override + public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { + if (source == this) { + return FOUND_SOURCE; + } + + // check first input + SourceAndDamReport res1 = this.input1.getSource().hasDamOnPathDownTo(source); + if (res1 == FOUND_SOURCE_AND_DAM) { + return FOUND_SOURCE_AND_DAM; + } + else if (res1 == FOUND_SOURCE) { + if (this.input1.getLocalStrategy().dams() || this.input1.getTempMode().breaksPipeline() || + getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) { + return FOUND_SOURCE_AND_DAM; + } else { + return FOUND_SOURCE; + } + } + else { + SourceAndDamReport res2 = this.input2.getSource().hasDamOnPathDownTo(source); + if (res2 == FOUND_SOURCE_AND_DAM) { + return FOUND_SOURCE_AND_DAM; + } + else if (res2 == FOUND_SOURCE) { + if (this.input2.getLocalStrategy().dams() || this.input2.getTempMode().breaksPipeline() || + getDriverStrategy().secondDam() == DamBehavior.FULL_DAM) { + return FOUND_SOURCE_AND_DAM; + } else { + return FOUND_SOURCE; + } + } + else { + // NOT_FOUND + // check the broadcast inputs + + for (NamedChannel nc : getBroadcastInputs()) { + SourceAndDamReport bcRes = nc.getSource().hasDamOnPathDownTo(source); + if (bcRes != NOT_FOUND) { + // broadcast inputs are always dams + return FOUND_SOURCE_AND_DAM; + } + } + return NOT_FOUND; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java new file mode 100644 index 0000000..d146c83 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.plan; + +/** + * A common interface for compiled Flink plans for both batch and streaming + * processing programs. + * + */ +public interface FlinkPlan { + +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java new file mode 100644 index 0000000..38f76b2 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.plan; + +import org.apache.flink.optimizer.dag.IterationNode; +import org.apache.flink.util.Visitor; + +/** + * + */ +public interface IterationPlanNode { + + void acceptForStepFunction(Visitor<PlanNode> visitor); + + IterationNode getIterationNode(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java new file mode 100644 index 0000000..3650eea --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.plan; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import org.apache.flink.optimizer.costs.Costs; +import org.apache.flink.optimizer.dag.BinaryUnionNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.util.IterableIterator; +import org.apache.flink.util.Visitor; + +/** + * A union operation over multiple inputs (2 or more). + */ +public class NAryUnionPlanNode extends PlanNode { + + private final List<Channel> inputs; + + /** + * @param template + */ + public NAryUnionPlanNode(BinaryUnionNode template, List<Channel> inputs, GlobalProperties gProps, + Costs cumulativeCosts) + { + super(template, "Union", DriverStrategy.NONE); + + this.inputs = inputs; + this.globalProps = gProps; + this.localProps = new LocalProperties(); + this.nodeCosts = new Costs(); + this.cumulativeCosts = cumulativeCosts; + } + + @Override + public void accept(Visitor<PlanNode> visitor) { + visitor.preVisit(this); + for (Channel c : this.inputs) { + c.getSource().accept(visitor); + } + visitor.postVisit(this); + } + + public List<Channel> getListOfInputs() { + return this.inputs; + } + + @Override + public Iterable<Channel> getInputs() { + return Collections.unmodifiableList(this.inputs); + } + + @Override + public Iterable<PlanNode> getPredecessors() { + final Iterator<Channel> channels = this.inputs.iterator(); + return new IterableIterator<PlanNode>() { + + @Override + public boolean hasNext() { + return channels.hasNext(); + } + + @Override + public PlanNode next() { + return channels.next().getSource(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator<PlanNode> iterator() { + return this; + } + }; + } + + @Override + public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { + // this node is used after the plan enumeration. consequently, this will never be invoked here + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java new file mode 100644 index 0000000..da97e61 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.plan; + +import org.apache.flink.optimizer.dag.TempMode; + +public class NamedChannel extends Channel { + + private final String name; + + /** + * Initializes NamedChannel. + * + * @param sourceNode + */ + public NamedChannel(String name, PlanNode sourceNode) { + super(sourceNode); + this.name = name; + } + + public NamedChannel(String name, PlanNode sourceNode, TempMode tempMode) { + super(sourceNode, tempMode); + this.name = name; + } + + public String getName() { + return this.name; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java new file mode 100644 index 0000000..d56be87 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.plan; + +import java.util.Collection; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.util.Visitable; +import org.apache.flink.util.Visitor; + +/** + * The execution plan generated by the Optimizer. It contains {@link PlanNode}s + * and {@link Channel}s that describe exactly how the program should be executed. + * It defines all ship strategies (local pipe, shuffle, broadcast, rebalance), all + * operator strategies (sorting-merge join, hash join, sorted grouping, ...), + * and the data exchange modes (batched, pipelined). + */ +public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode> { + + /** The data sources in the plan. */ + private final Collection<SourcePlanNode> dataSources; + + /** The data sinks in the plan. */ + private final Collection<SinkPlanNode> dataSinks; + + /** All nodes in the optimizer plan. */ + private final Collection<PlanNode> allNodes; + + /** The original program. */ + private final Plan originalProgram; + + /** Name of the job */ + private final String jobName; + + /** + * Creates a new instance of this optimizer plan container. The plan is given and fully + * described by the data sources, sinks and the collection of all nodes. + * + * @param sources The data sources. + * @param sinks The data sinks. + * @param allNodes A collection containing all nodes in the plan. + * @param jobName The name of the program + */ + public OptimizedPlan(Collection<SourcePlanNode> sources, Collection<SinkPlanNode> sinks, + Collection<PlanNode> allNodes, String jobName, Plan programPlan) + { + this.dataSources = sources; + this.dataSinks = sinks; + this.allNodes = allNodes; + this.jobName = jobName; + this.originalProgram = programPlan; + } + + /** + * Gets the data sources from this OptimizedPlan. + * + * @return The data sources. + */ + public Collection<SourcePlanNode> getDataSources() { + return dataSources; + } + + /** + * Gets the data sinks from this OptimizedPlan. + * + * @return The data sinks. + */ + public Collection<SinkPlanNode> getDataSinks() { + return dataSinks; + } + + /** + * Gets all the nodes from this OptimizedPlan. + * + * @return All nodes. + */ + public Collection<PlanNode> getAllNodes() { + return allNodes; + } + + /** + * Returns the name of the program. + * + * @return The name of the program. + */ + public String getJobName() { + return this.jobName; + } + + /** + * Gets the original program plan from which this optimized plan was created. + * + * @return The original program plan. + */ + public Plan getOriginalPactPlan() { + return this.originalProgram; + } + + // ------------------------------------------------------------------------ + + /** + * Applies the given visitor top down to all nodes, starting at the sinks. + * + * @param visitor + * The visitor to apply to the nodes in this plan. + * @see org.apache.flink.util.Visitable#accept(org.apache.flink.util.Visitor) + */ + @Override + public void accept(Visitor<PlanNode> visitor) { + for (SinkPlanNode node : this.dataSinks) { + node.accept(visitor); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java new file mode 100644 index 0000000..6f634fb --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.plan; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.costs.Costs; +import org.apache.flink.optimizer.dag.OptimizerNode; +import org.apache.flink.optimizer.dag.OptimizerNode.UnclosedBranchDescriptor; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.plandump.DumpableConnection; +import org.apache.flink.optimizer.plandump.DumpableNode; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.runtime.operators.util.LocalStrategy; +import org.apache.flink.util.Visitable; + +/** + * The representation of a data exchange between to operators. The data exchange can realize a shipping strategy, + * which established global properties, and a local strategy, which establishes local properties. + * <p> + * Because we currently deal only with plans where the operator order is fixed, many properties are equal + * among candidates and are determined prior to the enumeration (such as for example constant/dynamic path membership). + * Hence, many methods will delegate to the {@code OptimizerNode} that represents the node this candidate was + * created for. + */ +public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<PlanNode> { + + protected final OptimizerNode template; + + protected final List<Channel> outChannels; + + private List<NamedChannel> broadcastInputs; + + private final String nodeName; + + private DriverStrategy driverStrategy; // The local strategy (sorting / hashing, ...) + + protected LocalProperties localProps; // local properties of the data produced by this node + + protected GlobalProperties globalProps; // global properties of the data produced by this node + + protected Map<OptimizerNode, PlanNode> branchPlan; // the actual plan alternative chosen at a branch point + + protected Costs nodeCosts; // the costs incurred by this node + + protected Costs cumulativeCosts; // the cumulative costs of all operators in the sub-tree + + private double relativeMemoryPerSubTask; // the amount of memory dedicated to each task, in bytes + + private int parallelism; + + private boolean pFlag; // flag for the internal pruning algorithm + + // -------------------------------------------------------------------------------------------- + + public PlanNode(OptimizerNode template, String nodeName, DriverStrategy strategy) { + this.outChannels = new ArrayList<Channel>(2); + this.broadcastInputs = new ArrayList<NamedChannel>(); + this.template = template; + this.nodeName = nodeName; + this.driverStrategy = strategy; + + this.parallelism = template.getParallelism(); + + // check, if there is branch at this node. if yes, this candidate must be associated with + // the branching template node. + if (template.isBranching()) { + this.branchPlan = new HashMap<OptimizerNode, PlanNode>(6); + this.branchPlan.put(template, this); + } + } + + protected void mergeBranchPlanMaps(PlanNode pred1, PlanNode pred2) { + mergeBranchPlanMaps(pred1.branchPlan, pred2.branchPlan); + } + + protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode, PlanNode> branchPlan2) { + // merge the branchPlan maps according the template's uncloseBranchesStack + if (this.template.hasUnclosedBranches()) { + if (this.branchPlan == null) { + this.branchPlan = new HashMap<OptimizerNode, PlanNode>(8); + } + + for (UnclosedBranchDescriptor uc : this.template.getOpenBranches()) { + OptimizerNode brancher = uc.getBranchingNode(); + PlanNode selectedCandidate = null; + + if (branchPlan1 != null) { + // predecessor 1 has branching children, see if it got the branch we are looking for + selectedCandidate = branchPlan1.get(brancher); + } + + if (selectedCandidate == null && branchPlan2 != null) { + // predecessor 2 has branching children, see if it got the branch we are looking for + selectedCandidate = branchPlan2.get(brancher); + } + + // it may be that the branch candidate is only found once the broadcast variables are set + if (selectedCandidate != null) { + this.branchPlan.put(brancher, selectedCandidate); + } + } + } + } + + // -------------------------------------------------------------------------------------------- + // Accessors + // -------------------------------------------------------------------------------------------- + + /** + * Gets the node from the optimizer DAG for which this plan candidate node was created. + * + * @return The optimizer's DAG node. + */ + public OptimizerNode getOriginalOptimizerNode() { + return this.template; + } + + /** + * Gets the program operator that this node represents in the plan. + * + * @return The program operator this node represents in the plan. + */ + public Operator<?> getProgramOperator() { + return this.template.getOperator(); + } + + /** + * Gets the name of the plan node. + * + * @return The name of the plan node. + */ + public String getNodeName() { + return this.nodeName; + } + + public int getMemoryConsumerWeight() { + return this.driverStrategy.isMaterializing() ? 1 : 0; + } + + /** + * Gets the memory dedicated to each sub-task for this node. + * + * @return The memory per task, in bytes. + */ + public double getRelativeMemoryPerSubTask() { + return this.relativeMemoryPerSubTask; + } + + /** + * Sets the memory dedicated to each task for this node. + * + * @param relativeMemoryPerSubtask The relative memory per sub-task + */ + public void setRelativeMemoryPerSubtask(double relativeMemoryPerSubtask) { + this.relativeMemoryPerSubTask = relativeMemoryPerSubtask; + } + + /** + * Gets the driver strategy from this node. This determines for example for a <i>match</i> Pact whether + * to use a merge or a hybrid hash strategy. + * + * @return The driver strategy. + */ + public DriverStrategy getDriverStrategy() { + return this.driverStrategy; + } + + /** + * Sets the driver strategy for this node. Usually should not be changed. + * + * @param newDriverStrategy The driver strategy. + */ + public void setDriverStrategy(DriverStrategy newDriverStrategy) { + this.driverStrategy = newDriverStrategy; + } + + public void initProperties(GlobalProperties globals, LocalProperties locals) { + if (this.globalProps != null || this.localProps != null) { + throw new IllegalStateException(); + } + this.globalProps = globals; + this.localProps = locals; + } + + /** + * Gets the local properties from this PlanNode. + * + * @return The local properties. + */ + public LocalProperties getLocalProperties() { + return this.localProps; + } + + /** + * Gets the global properties from this PlanNode. + * + * @return The global properties. + */ + public GlobalProperties getGlobalProperties() { + return this.globalProps; + } + + /** + * Gets the costs incurred by this node. The costs reflect also the costs incurred by the shipping strategies + * of the incoming connections. + * + * @return The node-costs, or null, if not yet set. + */ + public Costs getNodeCosts() { + return this.nodeCosts; + } + + /** + * Gets the cumulative costs of this nose. The cumulative costs are the sum of the costs + * of this node and of all nodes in the subtree below this node. + * + * @return The cumulative costs, or null, if not yet set. + */ + public Costs getCumulativeCosts() { + return this.cumulativeCosts; + } + + public Costs getCumulativeCostsShare() { + if (this.cumulativeCosts == null) { + return null; + } else { + Costs result = cumulativeCosts.clone(); + if (this.template.getOutgoingConnections() != null) { + int outDegree = this.template.getOutgoingConnections().size(); + if (outDegree > 0) { + result.divideBy(outDegree); + } + } + + return result; + } + } + + + /** + * Sets the basic cost for this node to the given value, and sets the cumulative costs + * to those costs plus the cost shares of all inputs (regular and broadcast). + * + * @param nodeCosts The already knows costs for this node + * (this cost a produces by a concrete {@code OptimizerNode} subclass. + */ + public void setCosts(Costs nodeCosts) { + // set the node costs + this.nodeCosts = nodeCosts; + + // the cumulative costs are the node costs plus the costs of all inputs + this.cumulativeCosts = nodeCosts.clone(); + + // add all the normal inputs + for (PlanNode pred : getPredecessors()) { + + Costs parentCosts = pred.getCumulativeCostsShare(); + if (parentCosts != null) { + this.cumulativeCosts.addCosts(parentCosts); + } else { + throw new CompilerException("Trying to set the costs of an operator before the predecessor costs are computed."); + } + } + + // add all broadcast variable inputs + if (this.broadcastInputs != null) { + for (NamedChannel nc : this.broadcastInputs) { + Costs bcInputCost = nc.getSource().getCumulativeCostsShare(); + if (bcInputCost != null) { + this.cumulativeCosts.addCosts(bcInputCost); + } else { + throw new CompilerException("Trying to set the costs of an operator before the broadcast input costs are computed."); + } + } + } + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public int getParallelism() { + return this.parallelism; + } + + public long getGuaranteedAvailableMemory() { + return this.template.getMinimalMemoryAcrossAllSubTasks(); + } + + public Map<OptimizerNode, PlanNode> getBranchPlan() { + return branchPlan; + } + + // -------------------------------------------------------------------------------------------- + // Input, Predecessors, Successors + // -------------------------------------------------------------------------------------------- + + public abstract Iterable<Channel> getInputs(); + + @Override + public abstract Iterable<PlanNode> getPredecessors(); + + /** + * Sets a list of all broadcast inputs attached to this node. + */ + public void setBroadcastInputs(List<NamedChannel> broadcastInputs) { + if (broadcastInputs != null) { + this.broadcastInputs = broadcastInputs; + + // update the branch map + for (NamedChannel nc : broadcastInputs) { + PlanNode source = nc.getSource(); + + mergeBranchPlanMaps(branchPlan, source.branchPlan); + } + } + + // do a sanity check that if we are branching, we have now candidates for each branch point + if (this.template.hasUnclosedBranches()) { + if (this.branchPlan == null) { + throw new CompilerException("Branching and rejoining logic did not find a candidate for the branching point."); + } + + for (UnclosedBranchDescriptor uc : this.template.getOpenBranches()) { + OptimizerNode brancher = uc.getBranchingNode(); + if (this.branchPlan.get(brancher) == null) { + throw new CompilerException("Branching and rejoining logic did not find a candidate for the branching point."); + } + } + } + } + + /** + * Gets a list of all broadcast inputs attached to this node. + */ + public List<NamedChannel> getBroadcastInputs() { + return this.broadcastInputs; + } + + /** + * Adds a channel to a successor node to this node. + * + * @param channel The channel to the successor. + */ + public void addOutgoingChannel(Channel channel) { + this.outChannels.add(channel); + } + + /** + * Gets a list of all outgoing channels leading to successors. + * + * @return A list of all channels leading to successors. + */ + public List<Channel> getOutgoingChannels() { + return this.outChannels; + } + + // -------------------------------------------------------------------------------------------- + // Miscellaneous + // -------------------------------------------------------------------------------------------- + + public void updatePropertiesWithUniqueSets(Set<FieldSet> uniqueFieldCombinations) { + if (uniqueFieldCombinations == null || uniqueFieldCombinations.isEmpty()) { + return; + } + for (FieldSet fields : uniqueFieldCombinations) { + this.globalProps.addUniqueFieldCombination(fields); + this.localProps = this.localProps.addUniqueFields(fields); + } + } + + public PlanNode getCandidateAtBranchPoint(OptimizerNode branchPoint) { + if (branchPlan == null) { + return null; + } else { + return this.branchPlan.get(branchPoint); + } + } + + /** + * Sets the pruning marker to true. + */ + public void setPruningMarker() { + this.pFlag = true; + } + + /** + * Checks whether the pruning marker was set. + * + * @return True, if the pruning marker was set, false otherwise. + */ + public boolean isPruneMarkerSet() { + return this.pFlag; + } + + public boolean isOnDynamicPath() { + return this.template.isOnDynamicPath(); + } + + public int getCostWeight() { + return this.template.getCostWeight(); + } + + // -------------------------------------------------------------------------------------------- + + /** + * Checks whether this node has a dam on the way down to the given source node. This method + * returns either that (a) the source node is not found as a (transitive) child of this node, + * (b) the node is found, but no dam is on the path, or (c) the node is found and a dam is on + * the path. + * + * @param source The node on the path to which the dam is sought. + * @return The result whether the node is found and whether a dam is on the path. + */ + public abstract SourceAndDamReport hasDamOnPathDownTo(PlanNode source); + + public FeedbackPropertiesMeetRequirementsReport checkPartialSolutionPropertiesMet(PlanNode partialSolution, GlobalProperties feedbackGlobal, LocalProperties feedbackLocal) { + if (this == partialSolution) { + return FeedbackPropertiesMeetRequirementsReport.PENDING; + } + + boolean found = false; + boolean allMet = true; + boolean allLocallyMet = true; + + for (Channel input : getInputs()) { + FeedbackPropertiesMeetRequirementsReport inputState = input.getSource().checkPartialSolutionPropertiesMet(partialSolution, feedbackGlobal, feedbackLocal); + + if (inputState == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) { + continue; + } + else if (inputState == FeedbackPropertiesMeetRequirementsReport.MET) { + found = true; + continue; + } + else if (inputState == FeedbackPropertiesMeetRequirementsReport.NOT_MET) { + return FeedbackPropertiesMeetRequirementsReport.NOT_MET; + } + else { + found = true; + + // the partial solution was on the path here. check whether the channel requires + // certain properties that are met, or whether the channel introduces new properties + + // if the plan introduces new global properties, then we can stop looking whether + // the feedback properties are sufficient to meet the requirements + if (input.getShipStrategy() != ShipStrategyType.FORWARD && input.getShipStrategy() != ShipStrategyType.NONE) { + continue; + } + + // first check whether this channel requires something that is not met + if (input.getRequiredGlobalProps() != null && !input.getRequiredGlobalProps().isMetBy(feedbackGlobal)) { + return FeedbackPropertiesMeetRequirementsReport.NOT_MET; + } + + // in general, not everything is met here already + allMet = false; + + // if the plan introduces new local properties, we can stop checking for matching local properties + if (inputState != FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET) { + + if (input.getLocalStrategy() == LocalStrategy.NONE) { + + if (input.getRequiredLocalProps() != null && !input.getRequiredLocalProps().isMetBy(feedbackLocal)) { + return FeedbackPropertiesMeetRequirementsReport.NOT_MET; + } + + allLocallyMet = false; + } + } + } + } + + if (!found) { + return FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION; + } else if (allMet) { + return FeedbackPropertiesMeetRequirementsReport.MET; + } else if (allLocallyMet) { + return FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET; + } else { + return FeedbackPropertiesMeetRequirementsReport.PENDING; + } + } + + // -------------------------------------------------------------------------------------------- + + + @Override + public String toString() { + return this.template.getName() + " \"" + getProgramOperator().getName() + "\" : " + this.driverStrategy + + " [[ " + this.globalProps + " ]] [[ " + this.localProps + " ]]"; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public OptimizerNode getOptimizerNode() { + return this.template; + } + + @Override + public PlanNode getPlanNode() { + return this; + } + + @Override + public Iterable<DumpableConnection<PlanNode>> getDumpableInputs() { + List<DumpableConnection<PlanNode>> allInputs = new ArrayList<DumpableConnection<PlanNode>>(); + + for (Channel c : getInputs()) { + allInputs.add(c); + } + + for (NamedChannel c : getBroadcastInputs()) { + allInputs.add(c); + } + + return allInputs; + } + + // -------------------------------------------------------------------------------------------- + + public static enum SourceAndDamReport { + NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM; + } + + + + public static enum FeedbackPropertiesMeetRequirementsReport { + /** Indicates that the path is irrelevant */ + NO_PARTIAL_SOLUTION, + + /** Indicates that the question whether the properties are met has been determined pending + * dependent on global and local properties */ + PENDING, + + /** Indicates that the question whether the properties are met has been determined pending + * dependent on global properties only */ + PENDING_LOCAL_MET, + + /** Indicates that the question whether the properties are met has been determined true */ + MET, + + /** Indicates that the question whether the properties are met has been determined false */ + NOT_MET; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java new file mode 100644 index 0000000..b928be7 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.plan; + +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.typeutils.TypeComparatorFactory; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.dag.OptimizerNode; +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.runtime.operators.DamBehavior; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.util.Visitor; + +/** + * + */ +public class SingleInputPlanNode extends PlanNode { + + protected final Channel input; + + protected final FieldList[] driverKeys; + + protected final boolean[][] driverSortOrders; + + private TypeComparatorFactory<?>[] comparators; + + public Object postPassHelper; + + // -------------------------------------------------------------------------------------------- + + public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input, DriverStrategy driverStrategy) { + this(template, nodeName, input, driverStrategy, null, null); + } + + public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input, + DriverStrategy driverStrategy, FieldList driverKeyFields) + { + this(template, nodeName, input, driverStrategy, driverKeyFields, getTrueArray(driverKeyFields.size())); + } + + public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input, + DriverStrategy driverStrategy, FieldList driverKeyFields, boolean[] driverSortOrders) + { + super(template, nodeName, driverStrategy); + this.input = input; + + this.comparators = new TypeComparatorFactory<?>[driverStrategy.getNumRequiredComparators()]; + this.driverKeys = new FieldList[driverStrategy.getNumRequiredComparators()]; + this.driverSortOrders = new boolean[driverStrategy.getNumRequiredComparators()][]; + + if(driverStrategy.getNumRequiredComparators() > 0) { + this.driverKeys[0] = driverKeyFields; + this.driverSortOrders[0] = driverSortOrders; + } + + if (this.input.getShipStrategy() == ShipStrategyType.BROADCAST) { + this.input.setReplicationFactor(getParallelism()); + } + + final PlanNode predNode = input.getSource(); + + if (predNode.branchPlan != null && !predNode.branchPlan.isEmpty()) { + + if (this.branchPlan == null) { + this.branchPlan = new HashMap<OptimizerNode, PlanNode>(); + } + this.branchPlan.putAll(predNode.branchPlan); + } + } + + // -------------------------------------------------------------------------------------------- + + public SingleInputNode getSingleInputNode() { + if (this.template instanceof SingleInputNode) { + return (SingleInputNode) this.template; + } else { + throw new RuntimeException(); + } + } + + /** + * Gets the input channel to this node. + * + * @return The input channel to this node. + */ + public Channel getInput() { + return this.input; + } + + /** + * Gets the predecessor of this node, i.e. the source of the input channel. + * + * @return The predecessor of this node. + */ + public PlanNode getPredecessor() { + return this.input.getSource(); + } + + /** + * Sets the key field indexes for the specified driver comparator. + * + * @param keys The key field indexes for the specified driver comparator. + * @param id The ID of the driver comparator. + */ + public void setDriverKeyInfo(FieldList keys, int id) { + this.setDriverKeyInfo(keys, getTrueArray(keys.size()), id); + } + + /** + * Sets the key field information for the specified driver comparator. + * + * @param keys The key field indexes for the specified driver comparator. + * @param sortOrder The key sort order for the specified driver comparator. + * @param id The ID of the driver comparator. + */ + public void setDriverKeyInfo(FieldList keys, boolean[] sortOrder, int id) { + if(id < 0 || id >= driverKeys.length) { + throw new CompilerException("Invalid id for driver key information. DriverStrategy requires only " + +super.getDriverStrategy().getNumRequiredComparators()+" comparators."); + } + this.driverKeys[id] = keys; + this.driverSortOrders[id] = sortOrder; + } + + /** + * Gets the key field indexes for the specified driver comparator. + * + * @param id The id of the driver comparator for which the key field indexes are requested. + * @return The key field indexes of the specified driver comparator. + */ + public FieldList getKeys(int id) { + return this.driverKeys[id]; + } + + /** + * Gets the sort order for the specified driver comparator. + * + * @param id The id of the driver comparator for which the sort order is requested. + * @return The sort order of the specified driver comparator. + */ + public boolean[] getSortOrders(int id) { + return driverSortOrders[id]; + } + + /** + * Gets the specified comparator from this PlanNode. + * + * @param id The ID of the requested comparator. + * + * @return The specified comparator. + */ + public TypeComparatorFactory<?> getComparator(int id) { + return comparators[id]; + } + + /** + * Sets the specified comparator for this PlanNode. + * + * @param comparator The comparator to set. + * @param id The ID of the comparator to set. + */ + public void setComparator(TypeComparatorFactory<?> comparator, int id) { + this.comparators[id] = comparator; + } + + // -------------------------------------------------------------------------------------------- + + + @Override + public void accept(Visitor<PlanNode> visitor) { + if (visitor.preVisit(this)) { + this.input.getSource().accept(visitor); + + for (Channel broadcastInput : getBroadcastInputs()) { + broadcastInput.getSource().accept(visitor); + } + + visitor.postVisit(this); + } + } + + + @Override + public Iterable<PlanNode> getPredecessors() { + if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) { + return Collections.singleton(this.input.getSource()); + } + else { + List<PlanNode> preds = new ArrayList<PlanNode>(); + preds.add(input.getSource()); + + for (Channel c : getBroadcastInputs()) { + preds.add(c.getSource()); + } + + return preds; + } + } + + + @Override + public Iterable<Channel> getInputs() { + return Collections.singleton(this.input); + } + + @Override + public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { + if (source == this) { + return FOUND_SOURCE; + } + SourceAndDamReport res = this.input.getSource().hasDamOnPathDownTo(source); + if (res == FOUND_SOURCE_AND_DAM) { + return FOUND_SOURCE_AND_DAM; + } + else if (res == FOUND_SOURCE) { + return (this.input.getLocalStrategy().dams() || this.input.getTempMode().breaksPipeline() || + getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ? + FOUND_SOURCE_AND_DAM : FOUND_SOURCE; + } + else { + // NOT_FOUND + // check the broadcast inputs + + for (NamedChannel nc : getBroadcastInputs()) { + SourceAndDamReport bcRes = nc.getSource().hasDamOnPathDownTo(source); + if (bcRes != NOT_FOUND) { + // broadcast inputs are always dams + return FOUND_SOURCE_AND_DAM; + } + } + return NOT_FOUND; + } + } + + // -------------------------------------------------------------------------------------------- + + protected static boolean[] getTrueArray(int length) { + final boolean[] a = new boolean[length]; + for (int i = 0; i < length; i++) { + a[i] = true; + } + return a; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java new file mode 100644 index 0000000..451484d --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.plan; + +import java.util.List; + +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.costs.Costs; +import org.apache.flink.optimizer.dag.SinkJoiner; +import org.apache.flink.runtime.operators.DriverStrategy; + + +/** + * + */ +public class SinkJoinerPlanNode extends DualInputPlanNode { + + public SinkJoinerPlanNode(SinkJoiner template, Channel input1, Channel input2) { + super(template, "", input1, input2, DriverStrategy.BINARY_NO_OP); + } + + // -------------------------------------------------------------------------------------------- + + public void setCosts(Costs nodeCosts) { + // the plan enumeration logic works as for regular two-input-operators, which is important + // because of the branch handling logic. it does pick redistributing network channels + // between the sink and the sink joiner, because sinks joiner has a different DOP than the sink. + // we discard any cost and simply use the sum of the costs from the two children. + + Costs totalCosts = getInput1().getSource().getCumulativeCosts().clone(); + totalCosts.addCosts(getInput2().getSource().getCumulativeCosts()); + super.setCosts(totalCosts); + } + + // -------------------------------------------------------------------------------------------- + + public void getDataSinks(List<SinkPlanNode> sinks) { + final PlanNode in1 = this.input1.getSource(); + final PlanNode in2 = this.input2.getSource(); + + if (in1 instanceof SinkPlanNode) { + sinks.add((SinkPlanNode) in1); + } else if (in1 instanceof SinkJoinerPlanNode) { + ((SinkJoinerPlanNode) in1).getDataSinks(sinks); + } else { + throw new CompilerException("Illegal child node for a sink joiner utility node: Neither Sink nor Sink Joiner"); + } + + if (in2 instanceof SinkPlanNode) { + sinks.add((SinkPlanNode) in2); + } else if (in2 instanceof SinkJoinerPlanNode) { + ((SinkJoinerPlanNode) in2).getDataSinks(sinks); + } else { + throw new CompilerException("Illegal child node for a sink joiner utility node: Neither Sink nor Sink Joiner"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java new file mode 100644 index 0000000..656e67f --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.plan; + +import org.apache.flink.optimizer.dag.DataSinkNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +/** + * Plan candidate node for data flow sinks. + */ +public class SinkPlanNode extends SingleInputPlanNode +{ + /** + * Constructs a new sink candidate node that uses <i>NONE</i> as its local strategy. Note that + * local sorting and range partitioning are handled by the incoming channel already. + * + * @param template The template optimizer node that this candidate is created for. + */ + public SinkPlanNode(DataSinkNode template, String nodeName, Channel input) { + super(template, nodeName, input, DriverStrategy.NONE); + + this.globalProps = input.getGlobalProperties().clone(); + this.localProps = input.getLocalProperties().clone(); + } + + public DataSinkNode getSinkNode() { + if (this.template instanceof DataSinkNode) { + return (DataSinkNode) this.template; + } else { + throw new RuntimeException(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java new file mode 100644 index 0000000..63093dd --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.plan; + +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; + +import java.util.Collections; +import java.util.HashMap; + +import org.apache.flink.optimizer.costs.Costs; +import org.apache.flink.optimizer.dag.OptimizerNode; +import org.apache.flink.optimizer.dag.SolutionSetNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.util.Visitor; + +/** + * Plan candidate node for partial solution of a bulk iteration. + */ +public class SolutionSetPlanNode extends PlanNode { + + private static final Costs NO_COSTS = new Costs(); + + private WorksetIterationPlanNode containingIterationNode; + + private final Channel initialInput; + + public Object postPassHelper; + + + public SolutionSetPlanNode(SolutionSetNode template, String nodeName, + GlobalProperties gProps, LocalProperties lProps, + Channel initialInput) + { + super(template, nodeName, DriverStrategy.NONE); + + this.globalProps = gProps; + this.localProps = lProps; + this.initialInput = initialInput; + + // the node incurs no cost + this.nodeCosts = NO_COSTS; + this.cumulativeCosts = NO_COSTS; + + if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) { + if (this.branchPlan == null) { + this.branchPlan = new HashMap<OptimizerNode, PlanNode>(); + } + + this.branchPlan.putAll(initialInput.getSource().branchPlan); + } + } + + // -------------------------------------------------------------------------------------------- + + public SolutionSetNode getSolutionSetNode() { + return (SolutionSetNode) this.template; + } + + public WorksetIterationPlanNode getContainingIterationNode() { + return this.containingIterationNode; + } + + public void setContainingIterationNode(WorksetIterationPlanNode containingIterationNode) { + this.containingIterationNode = containingIterationNode; + } + + // -------------------------------------------------------------------------------------------- + + + @Override + public void accept(Visitor<PlanNode> visitor) { + if (visitor.preVisit(this)) { + visitor.postVisit(this); + } + } + + + @Override + public Iterable<PlanNode> getPredecessors() { + return Collections.<PlanNode>emptyList(); + } + + + @Override + public Iterable<Channel> getInputs() { + return Collections.<Channel>emptyList(); + } + + + @Override + public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { + if (source == this) { + return FOUND_SOURCE_AND_DAM; + } + + SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source); + if (res == FOUND_SOURCE_AND_DAM || res == FOUND_SOURCE) { + return FOUND_SOURCE_AND_DAM; + } else { + return NOT_FOUND; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java new file mode 100644 index 0000000..11b7cc9 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.plan; + +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; + +import java.util.Collections; + +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.optimizer.dag.DataSourceNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.util.Visitor; + +/** + * Plan candidate node for data flow sources that have no input and no special strategies. + */ +public class SourcePlanNode extends PlanNode { + + private TypeSerializerFactory<?> serializer; + + /** + * Constructs a new source candidate node that uses <i>NONE</i> as its local strategy. + * + * @param template The template optimizer node that this candidate is created for. + */ + public SourcePlanNode(DataSourceNode template, String nodeName) { + this(template, nodeName, new GlobalProperties(), new LocalProperties()); + } + + public SourcePlanNode(DataSourceNode template, String nodeName, GlobalProperties gprops, LocalProperties lprops) { + super(template, nodeName, DriverStrategy.NONE); + + this.globalProps = gprops; + this.localProps = lprops; + updatePropertiesWithUniqueSets(template.getUniqueFields()); + } + + // -------------------------------------------------------------------------------------------- + + public DataSourceNode getDataSourceNode() { + return (DataSourceNode) this.template; + } + + /** + * Gets the serializer from this PlanNode. + * + * @return The serializer. + */ + public TypeSerializerFactory<?> getSerializer() { + return serializer; + } + + /** + * Sets the serializer for this PlanNode. + * + * @param serializer The serializer to set. + */ + public void setSerializer(TypeSerializerFactory<?> serializer) { + this.serializer = serializer; + } + + // -------------------------------------------------------------------------------------------- + + + @Override + public void accept(Visitor<PlanNode> visitor) { + if (visitor.preVisit(this)) { + visitor.postVisit(this); + } + } + + + @Override + public Iterable<PlanNode> getPredecessors() { + return Collections.<PlanNode>emptyList(); + } + + + @Override + public Iterable<Channel> getInputs() { + return Collections.<Channel>emptyList(); + } + + + @Override + public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { + if (source == this) { + return FOUND_SOURCE; + } else { + return NOT_FOUND; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java new file mode 100644 index 0000000..880f2e3 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.plan; + +import java.io.File; +import java.io.IOException; + +import org.apache.flink.runtime.jobgraph.JobGraph; + +/** + * Abstract class representing Flink Streaming plans + * + */ +public abstract class StreamingPlan implements FlinkPlan { + + public abstract JobGraph getJobGraph(); + + public abstract String getStreamingPlanAsJSON(); + + public abstract void dumpStreamingPlanAsJSON(File file) throws IOException; + +}