http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java deleted file mode 100644 index 6f634fb..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java +++ /dev/null @@ -1,573 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.plan; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.costs.Costs; -import org.apache.flink.optimizer.dag.OptimizerNode; -import org.apache.flink.optimizer.dag.OptimizerNode.UnclosedBranchDescriptor; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.plandump.DumpableConnection; -import org.apache.flink.optimizer.plandump.DumpableNode; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.util.LocalStrategy; -import org.apache.flink.util.Visitable; - -/** - * The representation of a data exchange between to operators. The data exchange can realize a shipping strategy, - * which established global properties, and a local strategy, which establishes local properties. - * <p> - * Because we currently deal only with plans where the operator order is fixed, many properties are equal - * among candidates and are determined prior to the enumeration (such as for example constant/dynamic path membership). - * Hence, many methods will delegate to the {@code OptimizerNode} that represents the node this candidate was - * created for. - */ -public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<PlanNode> { - - protected final OptimizerNode template; - - protected final List<Channel> outChannels; - - private List<NamedChannel> broadcastInputs; - - private final String nodeName; - - private DriverStrategy driverStrategy; // The local strategy (sorting / hashing, ...) - - protected LocalProperties localProps; // local properties of the data produced by this node - - protected GlobalProperties globalProps; // global properties of the data produced by this node - - protected Map<OptimizerNode, PlanNode> branchPlan; // the actual plan alternative chosen at a branch point - - protected Costs nodeCosts; // the costs incurred by this node - - protected Costs cumulativeCosts; // the cumulative costs of all operators in the sub-tree - - private double relativeMemoryPerSubTask; // the amount of memory dedicated to each task, in bytes - - private int parallelism; - - private boolean pFlag; // flag for the internal pruning algorithm - - // -------------------------------------------------------------------------------------------- - - public PlanNode(OptimizerNode template, String nodeName, DriverStrategy strategy) { - this.outChannels = new ArrayList<Channel>(2); - this.broadcastInputs = new ArrayList<NamedChannel>(); - this.template = template; - this.nodeName = nodeName; - this.driverStrategy = strategy; - - this.parallelism = template.getParallelism(); - - // check, if there is branch at this node. if yes, this candidate must be associated with - // the branching template node. - if (template.isBranching()) { - this.branchPlan = new HashMap<OptimizerNode, PlanNode>(6); - this.branchPlan.put(template, this); - } - } - - protected void mergeBranchPlanMaps(PlanNode pred1, PlanNode pred2) { - mergeBranchPlanMaps(pred1.branchPlan, pred2.branchPlan); - } - - protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode, PlanNode> branchPlan2) { - // merge the branchPlan maps according the template's uncloseBranchesStack - if (this.template.hasUnclosedBranches()) { - if (this.branchPlan == null) { - this.branchPlan = new HashMap<OptimizerNode, PlanNode>(8); - } - - for (UnclosedBranchDescriptor uc : this.template.getOpenBranches()) { - OptimizerNode brancher = uc.getBranchingNode(); - PlanNode selectedCandidate = null; - - if (branchPlan1 != null) { - // predecessor 1 has branching children, see if it got the branch we are looking for - selectedCandidate = branchPlan1.get(brancher); - } - - if (selectedCandidate == null && branchPlan2 != null) { - // predecessor 2 has branching children, see if it got the branch we are looking for - selectedCandidate = branchPlan2.get(brancher); - } - - // it may be that the branch candidate is only found once the broadcast variables are set - if (selectedCandidate != null) { - this.branchPlan.put(brancher, selectedCandidate); - } - } - } - } - - // -------------------------------------------------------------------------------------------- - // Accessors - // -------------------------------------------------------------------------------------------- - - /** - * Gets the node from the optimizer DAG for which this plan candidate node was created. - * - * @return The optimizer's DAG node. - */ - public OptimizerNode getOriginalOptimizerNode() { - return this.template; - } - - /** - * Gets the program operator that this node represents in the plan. - * - * @return The program operator this node represents in the plan. - */ - public Operator<?> getProgramOperator() { - return this.template.getOperator(); - } - - /** - * Gets the name of the plan node. - * - * @return The name of the plan node. - */ - public String getNodeName() { - return this.nodeName; - } - - public int getMemoryConsumerWeight() { - return this.driverStrategy.isMaterializing() ? 1 : 0; - } - - /** - * Gets the memory dedicated to each sub-task for this node. - * - * @return The memory per task, in bytes. - */ - public double getRelativeMemoryPerSubTask() { - return this.relativeMemoryPerSubTask; - } - - /** - * Sets the memory dedicated to each task for this node. - * - * @param relativeMemoryPerSubtask The relative memory per sub-task - */ - public void setRelativeMemoryPerSubtask(double relativeMemoryPerSubtask) { - this.relativeMemoryPerSubTask = relativeMemoryPerSubtask; - } - - /** - * Gets the driver strategy from this node. This determines for example for a <i>match</i> Pact whether - * to use a merge or a hybrid hash strategy. - * - * @return The driver strategy. - */ - public DriverStrategy getDriverStrategy() { - return this.driverStrategy; - } - - /** - * Sets the driver strategy for this node. Usually should not be changed. - * - * @param newDriverStrategy The driver strategy. - */ - public void setDriverStrategy(DriverStrategy newDriverStrategy) { - this.driverStrategy = newDriverStrategy; - } - - public void initProperties(GlobalProperties globals, LocalProperties locals) { - if (this.globalProps != null || this.localProps != null) { - throw new IllegalStateException(); - } - this.globalProps = globals; - this.localProps = locals; - } - - /** - * Gets the local properties from this PlanNode. - * - * @return The local properties. - */ - public LocalProperties getLocalProperties() { - return this.localProps; - } - - /** - * Gets the global properties from this PlanNode. - * - * @return The global properties. - */ - public GlobalProperties getGlobalProperties() { - return this.globalProps; - } - - /** - * Gets the costs incurred by this node. The costs reflect also the costs incurred by the shipping strategies - * of the incoming connections. - * - * @return The node-costs, or null, if not yet set. - */ - public Costs getNodeCosts() { - return this.nodeCosts; - } - - /** - * Gets the cumulative costs of this nose. The cumulative costs are the sum of the costs - * of this node and of all nodes in the subtree below this node. - * - * @return The cumulative costs, or null, if not yet set. - */ - public Costs getCumulativeCosts() { - return this.cumulativeCosts; - } - - public Costs getCumulativeCostsShare() { - if (this.cumulativeCosts == null) { - return null; - } else { - Costs result = cumulativeCosts.clone(); - if (this.template.getOutgoingConnections() != null) { - int outDegree = this.template.getOutgoingConnections().size(); - if (outDegree > 0) { - result.divideBy(outDegree); - } - } - - return result; - } - } - - - /** - * Sets the basic cost for this node to the given value, and sets the cumulative costs - * to those costs plus the cost shares of all inputs (regular and broadcast). - * - * @param nodeCosts The already knows costs for this node - * (this cost a produces by a concrete {@code OptimizerNode} subclass. - */ - public void setCosts(Costs nodeCosts) { - // set the node costs - this.nodeCosts = nodeCosts; - - // the cumulative costs are the node costs plus the costs of all inputs - this.cumulativeCosts = nodeCosts.clone(); - - // add all the normal inputs - for (PlanNode pred : getPredecessors()) { - - Costs parentCosts = pred.getCumulativeCostsShare(); - if (parentCosts != null) { - this.cumulativeCosts.addCosts(parentCosts); - } else { - throw new CompilerException("Trying to set the costs of an operator before the predecessor costs are computed."); - } - } - - // add all broadcast variable inputs - if (this.broadcastInputs != null) { - for (NamedChannel nc : this.broadcastInputs) { - Costs bcInputCost = nc.getSource().getCumulativeCostsShare(); - if (bcInputCost != null) { - this.cumulativeCosts.addCosts(bcInputCost); - } else { - throw new CompilerException("Trying to set the costs of an operator before the broadcast input costs are computed."); - } - } - } - } - - public void setParallelism(int parallelism) { - this.parallelism = parallelism; - } - - public int getParallelism() { - return this.parallelism; - } - - public long getGuaranteedAvailableMemory() { - return this.template.getMinimalMemoryAcrossAllSubTasks(); - } - - public Map<OptimizerNode, PlanNode> getBranchPlan() { - return branchPlan; - } - - // -------------------------------------------------------------------------------------------- - // Input, Predecessors, Successors - // -------------------------------------------------------------------------------------------- - - public abstract Iterable<Channel> getInputs(); - - @Override - public abstract Iterable<PlanNode> getPredecessors(); - - /** - * Sets a list of all broadcast inputs attached to this node. - */ - public void setBroadcastInputs(List<NamedChannel> broadcastInputs) { - if (broadcastInputs != null) { - this.broadcastInputs = broadcastInputs; - - // update the branch map - for (NamedChannel nc : broadcastInputs) { - PlanNode source = nc.getSource(); - - mergeBranchPlanMaps(branchPlan, source.branchPlan); - } - } - - // do a sanity check that if we are branching, we have now candidates for each branch point - if (this.template.hasUnclosedBranches()) { - if (this.branchPlan == null) { - throw new CompilerException("Branching and rejoining logic did not find a candidate for the branching point."); - } - - for (UnclosedBranchDescriptor uc : this.template.getOpenBranches()) { - OptimizerNode brancher = uc.getBranchingNode(); - if (this.branchPlan.get(brancher) == null) { - throw new CompilerException("Branching and rejoining logic did not find a candidate for the branching point."); - } - } - } - } - - /** - * Gets a list of all broadcast inputs attached to this node. - */ - public List<NamedChannel> getBroadcastInputs() { - return this.broadcastInputs; - } - - /** - * Adds a channel to a successor node to this node. - * - * @param channel The channel to the successor. - */ - public void addOutgoingChannel(Channel channel) { - this.outChannels.add(channel); - } - - /** - * Gets a list of all outgoing channels leading to successors. - * - * @return A list of all channels leading to successors. - */ - public List<Channel> getOutgoingChannels() { - return this.outChannels; - } - - // -------------------------------------------------------------------------------------------- - // Miscellaneous - // -------------------------------------------------------------------------------------------- - - public void updatePropertiesWithUniqueSets(Set<FieldSet> uniqueFieldCombinations) { - if (uniqueFieldCombinations == null || uniqueFieldCombinations.isEmpty()) { - return; - } - for (FieldSet fields : uniqueFieldCombinations) { - this.globalProps.addUniqueFieldCombination(fields); - this.localProps = this.localProps.addUniqueFields(fields); - } - } - - public PlanNode getCandidateAtBranchPoint(OptimizerNode branchPoint) { - if (branchPlan == null) { - return null; - } else { - return this.branchPlan.get(branchPoint); - } - } - - /** - * Sets the pruning marker to true. - */ - public void setPruningMarker() { - this.pFlag = true; - } - - /** - * Checks whether the pruning marker was set. - * - * @return True, if the pruning marker was set, false otherwise. - */ - public boolean isPruneMarkerSet() { - return this.pFlag; - } - - public boolean isOnDynamicPath() { - return this.template.isOnDynamicPath(); - } - - public int getCostWeight() { - return this.template.getCostWeight(); - } - - // -------------------------------------------------------------------------------------------- - - /** - * Checks whether this node has a dam on the way down to the given source node. This method - * returns either that (a) the source node is not found as a (transitive) child of this node, - * (b) the node is found, but no dam is on the path, or (c) the node is found and a dam is on - * the path. - * - * @param source The node on the path to which the dam is sought. - * @return The result whether the node is found and whether a dam is on the path. - */ - public abstract SourceAndDamReport hasDamOnPathDownTo(PlanNode source); - - public FeedbackPropertiesMeetRequirementsReport checkPartialSolutionPropertiesMet(PlanNode partialSolution, GlobalProperties feedbackGlobal, LocalProperties feedbackLocal) { - if (this == partialSolution) { - return FeedbackPropertiesMeetRequirementsReport.PENDING; - } - - boolean found = false; - boolean allMet = true; - boolean allLocallyMet = true; - - for (Channel input : getInputs()) { - FeedbackPropertiesMeetRequirementsReport inputState = input.getSource().checkPartialSolutionPropertiesMet(partialSolution, feedbackGlobal, feedbackLocal); - - if (inputState == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) { - continue; - } - else if (inputState == FeedbackPropertiesMeetRequirementsReport.MET) { - found = true; - continue; - } - else if (inputState == FeedbackPropertiesMeetRequirementsReport.NOT_MET) { - return FeedbackPropertiesMeetRequirementsReport.NOT_MET; - } - else { - found = true; - - // the partial solution was on the path here. check whether the channel requires - // certain properties that are met, or whether the channel introduces new properties - - // if the plan introduces new global properties, then we can stop looking whether - // the feedback properties are sufficient to meet the requirements - if (input.getShipStrategy() != ShipStrategyType.FORWARD && input.getShipStrategy() != ShipStrategyType.NONE) { - continue; - } - - // first check whether this channel requires something that is not met - if (input.getRequiredGlobalProps() != null && !input.getRequiredGlobalProps().isMetBy(feedbackGlobal)) { - return FeedbackPropertiesMeetRequirementsReport.NOT_MET; - } - - // in general, not everything is met here already - allMet = false; - - // if the plan introduces new local properties, we can stop checking for matching local properties - if (inputState != FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET) { - - if (input.getLocalStrategy() == LocalStrategy.NONE) { - - if (input.getRequiredLocalProps() != null && !input.getRequiredLocalProps().isMetBy(feedbackLocal)) { - return FeedbackPropertiesMeetRequirementsReport.NOT_MET; - } - - allLocallyMet = false; - } - } - } - } - - if (!found) { - return FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION; - } else if (allMet) { - return FeedbackPropertiesMeetRequirementsReport.MET; - } else if (allLocallyMet) { - return FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET; - } else { - return FeedbackPropertiesMeetRequirementsReport.PENDING; - } - } - - // -------------------------------------------------------------------------------------------- - - - @Override - public String toString() { - return this.template.getName() + " \"" + getProgramOperator().getName() + "\" : " + this.driverStrategy + - " [[ " + this.globalProps + " ]] [[ " + this.localProps + " ]]"; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public OptimizerNode getOptimizerNode() { - return this.template; - } - - @Override - public PlanNode getPlanNode() { - return this; - } - - @Override - public Iterable<DumpableConnection<PlanNode>> getDumpableInputs() { - List<DumpableConnection<PlanNode>> allInputs = new ArrayList<DumpableConnection<PlanNode>>(); - - for (Channel c : getInputs()) { - allInputs.add(c); - } - - for (NamedChannel c : getBroadcastInputs()) { - allInputs.add(c); - } - - return allInputs; - } - - // -------------------------------------------------------------------------------------------- - - public static enum SourceAndDamReport { - NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM; - } - - - - public static enum FeedbackPropertiesMeetRequirementsReport { - /** Indicates that the path is irrelevant */ - NO_PARTIAL_SOLUTION, - - /** Indicates that the question whether the properties are met has been determined pending - * dependent on global and local properties */ - PENDING, - - /** Indicates that the question whether the properties are met has been determined pending - * dependent on global properties only */ - PENDING_LOCAL_MET, - - /** Indicates that the question whether the properties are met has been determined true */ - MET, - - /** Indicates that the question whether the properties are met has been determined false */ - NOT_MET; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java deleted file mode 100644 index b928be7..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.plan; - -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; - -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.common.typeutils.TypeComparatorFactory; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.dag.OptimizerNode; -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.runtime.operators.DamBehavior; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.util.Visitor; - -/** - * - */ -public class SingleInputPlanNode extends PlanNode { - - protected final Channel input; - - protected final FieldList[] driverKeys; - - protected final boolean[][] driverSortOrders; - - private TypeComparatorFactory<?>[] comparators; - - public Object postPassHelper; - - // -------------------------------------------------------------------------------------------- - - public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input, DriverStrategy driverStrategy) { - this(template, nodeName, input, driverStrategy, null, null); - } - - public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input, - DriverStrategy driverStrategy, FieldList driverKeyFields) - { - this(template, nodeName, input, driverStrategy, driverKeyFields, getTrueArray(driverKeyFields.size())); - } - - public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input, - DriverStrategy driverStrategy, FieldList driverKeyFields, boolean[] driverSortOrders) - { - super(template, nodeName, driverStrategy); - this.input = input; - - this.comparators = new TypeComparatorFactory<?>[driverStrategy.getNumRequiredComparators()]; - this.driverKeys = new FieldList[driverStrategy.getNumRequiredComparators()]; - this.driverSortOrders = new boolean[driverStrategy.getNumRequiredComparators()][]; - - if(driverStrategy.getNumRequiredComparators() > 0) { - this.driverKeys[0] = driverKeyFields; - this.driverSortOrders[0] = driverSortOrders; - } - - if (this.input.getShipStrategy() == ShipStrategyType.BROADCAST) { - this.input.setReplicationFactor(getParallelism()); - } - - final PlanNode predNode = input.getSource(); - - if (predNode.branchPlan != null && !predNode.branchPlan.isEmpty()) { - - if (this.branchPlan == null) { - this.branchPlan = new HashMap<OptimizerNode, PlanNode>(); - } - this.branchPlan.putAll(predNode.branchPlan); - } - } - - // -------------------------------------------------------------------------------------------- - - public SingleInputNode getSingleInputNode() { - if (this.template instanceof SingleInputNode) { - return (SingleInputNode) this.template; - } else { - throw new RuntimeException(); - } - } - - /** - * Gets the input channel to this node. - * - * @return The input channel to this node. - */ - public Channel getInput() { - return this.input; - } - - /** - * Gets the predecessor of this node, i.e. the source of the input channel. - * - * @return The predecessor of this node. - */ - public PlanNode getPredecessor() { - return this.input.getSource(); - } - - /** - * Sets the key field indexes for the specified driver comparator. - * - * @param keys The key field indexes for the specified driver comparator. - * @param id The ID of the driver comparator. - */ - public void setDriverKeyInfo(FieldList keys, int id) { - this.setDriverKeyInfo(keys, getTrueArray(keys.size()), id); - } - - /** - * Sets the key field information for the specified driver comparator. - * - * @param keys The key field indexes for the specified driver comparator. - * @param sortOrder The key sort order for the specified driver comparator. - * @param id The ID of the driver comparator. - */ - public void setDriverKeyInfo(FieldList keys, boolean[] sortOrder, int id) { - if(id < 0 || id >= driverKeys.length) { - throw new CompilerException("Invalid id for driver key information. DriverStrategy requires only " - +super.getDriverStrategy().getNumRequiredComparators()+" comparators."); - } - this.driverKeys[id] = keys; - this.driverSortOrders[id] = sortOrder; - } - - /** - * Gets the key field indexes for the specified driver comparator. - * - * @param id The id of the driver comparator for which the key field indexes are requested. - * @return The key field indexes of the specified driver comparator. - */ - public FieldList getKeys(int id) { - return this.driverKeys[id]; - } - - /** - * Gets the sort order for the specified driver comparator. - * - * @param id The id of the driver comparator for which the sort order is requested. - * @return The sort order of the specified driver comparator. - */ - public boolean[] getSortOrders(int id) { - return driverSortOrders[id]; - } - - /** - * Gets the specified comparator from this PlanNode. - * - * @param id The ID of the requested comparator. - * - * @return The specified comparator. - */ - public TypeComparatorFactory<?> getComparator(int id) { - return comparators[id]; - } - - /** - * Sets the specified comparator for this PlanNode. - * - * @param comparator The comparator to set. - * @param id The ID of the comparator to set. - */ - public void setComparator(TypeComparatorFactory<?> comparator, int id) { - this.comparators[id] = comparator; - } - - // -------------------------------------------------------------------------------------------- - - - @Override - public void accept(Visitor<PlanNode> visitor) { - if (visitor.preVisit(this)) { - this.input.getSource().accept(visitor); - - for (Channel broadcastInput : getBroadcastInputs()) { - broadcastInput.getSource().accept(visitor); - } - - visitor.postVisit(this); - } - } - - - @Override - public Iterable<PlanNode> getPredecessors() { - if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) { - return Collections.singleton(this.input.getSource()); - } - else { - List<PlanNode> preds = new ArrayList<PlanNode>(); - preds.add(input.getSource()); - - for (Channel c : getBroadcastInputs()) { - preds.add(c.getSource()); - } - - return preds; - } - } - - - @Override - public Iterable<Channel> getInputs() { - return Collections.singleton(this.input); - } - - @Override - public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { - if (source == this) { - return FOUND_SOURCE; - } - SourceAndDamReport res = this.input.getSource().hasDamOnPathDownTo(source); - if (res == FOUND_SOURCE_AND_DAM) { - return FOUND_SOURCE_AND_DAM; - } - else if (res == FOUND_SOURCE) { - return (this.input.getLocalStrategy().dams() || this.input.getTempMode().breaksPipeline() || - getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ? - FOUND_SOURCE_AND_DAM : FOUND_SOURCE; - } - else { - // NOT_FOUND - // check the broadcast inputs - - for (NamedChannel nc : getBroadcastInputs()) { - SourceAndDamReport bcRes = nc.getSource().hasDamOnPathDownTo(source); - if (bcRes != NOT_FOUND) { - // broadcast inputs are always dams - return FOUND_SOURCE_AND_DAM; - } - } - return NOT_FOUND; - } - } - - // -------------------------------------------------------------------------------------------- - - protected static boolean[] getTrueArray(int length) { - final boolean[] a = new boolean[length]; - for (int i = 0; i < length; i++) { - a[i] = true; - } - return a; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java deleted file mode 100644 index 451484d..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.plan; - -import java.util.List; - -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.costs.Costs; -import org.apache.flink.optimizer.dag.SinkJoiner; -import org.apache.flink.runtime.operators.DriverStrategy; - - -/** - * - */ -public class SinkJoinerPlanNode extends DualInputPlanNode { - - public SinkJoinerPlanNode(SinkJoiner template, Channel input1, Channel input2) { - super(template, "", input1, input2, DriverStrategy.BINARY_NO_OP); - } - - // -------------------------------------------------------------------------------------------- - - public void setCosts(Costs nodeCosts) { - // the plan enumeration logic works as for regular two-input-operators, which is important - // because of the branch handling logic. it does pick redistributing network channels - // between the sink and the sink joiner, because sinks joiner has a different DOP than the sink. - // we discard any cost and simply use the sum of the costs from the two children. - - Costs totalCosts = getInput1().getSource().getCumulativeCosts().clone(); - totalCosts.addCosts(getInput2().getSource().getCumulativeCosts()); - super.setCosts(totalCosts); - } - - // -------------------------------------------------------------------------------------------- - - public void getDataSinks(List<SinkPlanNode> sinks) { - final PlanNode in1 = this.input1.getSource(); - final PlanNode in2 = this.input2.getSource(); - - if (in1 instanceof SinkPlanNode) { - sinks.add((SinkPlanNode) in1); - } else if (in1 instanceof SinkJoinerPlanNode) { - ((SinkJoinerPlanNode) in1).getDataSinks(sinks); - } else { - throw new CompilerException("Illegal child node for a sink joiner utility node: Neither Sink nor Sink Joiner"); - } - - if (in2 instanceof SinkPlanNode) { - sinks.add((SinkPlanNode) in2); - } else if (in2 instanceof SinkJoinerPlanNode) { - ((SinkJoinerPlanNode) in2).getDataSinks(sinks); - } else { - throw new CompilerException("Illegal child node for a sink joiner utility node: Neither Sink nor Sink Joiner"); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java deleted file mode 100644 index 656e67f..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.plan; - -import org.apache.flink.optimizer.dag.DataSinkNode; -import org.apache.flink.runtime.operators.DriverStrategy; - -/** - * Plan candidate node for data flow sinks. - */ -public class SinkPlanNode extends SingleInputPlanNode -{ - /** - * Constructs a new sink candidate node that uses <i>NONE</i> as its local strategy. Note that - * local sorting and range partitioning are handled by the incoming channel already. - * - * @param template The template optimizer node that this candidate is created for. - */ - public SinkPlanNode(DataSinkNode template, String nodeName, Channel input) { - super(template, nodeName, input, DriverStrategy.NONE); - - this.globalProps = input.getGlobalProperties().clone(); - this.localProps = input.getLocalProperties().clone(); - } - - public DataSinkNode getSinkNode() { - if (this.template instanceof DataSinkNode) { - return (DataSinkNode) this.template; - } else { - throw new RuntimeException(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java deleted file mode 100644 index 63093dd..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.plan; - -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; - -import java.util.Collections; -import java.util.HashMap; - -import org.apache.flink.optimizer.costs.Costs; -import org.apache.flink.optimizer.dag.OptimizerNode; -import org.apache.flink.optimizer.dag.SolutionSetNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.util.Visitor; - -/** - * Plan candidate node for partial solution of a bulk iteration. - */ -public class SolutionSetPlanNode extends PlanNode { - - private static final Costs NO_COSTS = new Costs(); - - private WorksetIterationPlanNode containingIterationNode; - - private final Channel initialInput; - - public Object postPassHelper; - - - public SolutionSetPlanNode(SolutionSetNode template, String nodeName, - GlobalProperties gProps, LocalProperties lProps, - Channel initialInput) - { - super(template, nodeName, DriverStrategy.NONE); - - this.globalProps = gProps; - this.localProps = lProps; - this.initialInput = initialInput; - - // the node incurs no cost - this.nodeCosts = NO_COSTS; - this.cumulativeCosts = NO_COSTS; - - if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) { - if (this.branchPlan == null) { - this.branchPlan = new HashMap<OptimizerNode, PlanNode>(); - } - - this.branchPlan.putAll(initialInput.getSource().branchPlan); - } - } - - // -------------------------------------------------------------------------------------------- - - public SolutionSetNode getSolutionSetNode() { - return (SolutionSetNode) this.template; - } - - public WorksetIterationPlanNode getContainingIterationNode() { - return this.containingIterationNode; - } - - public void setContainingIterationNode(WorksetIterationPlanNode containingIterationNode) { - this.containingIterationNode = containingIterationNode; - } - - // -------------------------------------------------------------------------------------------- - - - @Override - public void accept(Visitor<PlanNode> visitor) { - if (visitor.preVisit(this)) { - visitor.postVisit(this); - } - } - - - @Override - public Iterable<PlanNode> getPredecessors() { - return Collections.<PlanNode>emptyList(); - } - - - @Override - public Iterable<Channel> getInputs() { - return Collections.<Channel>emptyList(); - } - - - @Override - public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { - if (source == this) { - return FOUND_SOURCE_AND_DAM; - } - - SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source); - if (res == FOUND_SOURCE_AND_DAM || res == FOUND_SOURCE) { - return FOUND_SOURCE_AND_DAM; - } else { - return NOT_FOUND; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java deleted file mode 100644 index 11b7cc9..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.plan; - -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; - -import java.util.Collections; - -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.optimizer.dag.DataSourceNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.util.Visitor; - -/** - * Plan candidate node for data flow sources that have no input and no special strategies. - */ -public class SourcePlanNode extends PlanNode { - - private TypeSerializerFactory<?> serializer; - - /** - * Constructs a new source candidate node that uses <i>NONE</i> as its local strategy. - * - * @param template The template optimizer node that this candidate is created for. - */ - public SourcePlanNode(DataSourceNode template, String nodeName) { - this(template, nodeName, new GlobalProperties(), new LocalProperties()); - } - - public SourcePlanNode(DataSourceNode template, String nodeName, GlobalProperties gprops, LocalProperties lprops) { - super(template, nodeName, DriverStrategy.NONE); - - this.globalProps = gprops; - this.localProps = lprops; - updatePropertiesWithUniqueSets(template.getUniqueFields()); - } - - // -------------------------------------------------------------------------------------------- - - public DataSourceNode getDataSourceNode() { - return (DataSourceNode) this.template; - } - - /** - * Gets the serializer from this PlanNode. - * - * @return The serializer. - */ - public TypeSerializerFactory<?> getSerializer() { - return serializer; - } - - /** - * Sets the serializer for this PlanNode. - * - * @param serializer The serializer to set. - */ - public void setSerializer(TypeSerializerFactory<?> serializer) { - this.serializer = serializer; - } - - // -------------------------------------------------------------------------------------------- - - - @Override - public void accept(Visitor<PlanNode> visitor) { - if (visitor.preVisit(this)) { - visitor.postVisit(this); - } - } - - - @Override - public Iterable<PlanNode> getPredecessors() { - return Collections.<PlanNode>emptyList(); - } - - - @Override - public Iterable<Channel> getInputs() { - return Collections.<Channel>emptyList(); - } - - - @Override - public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { - if (source == this) { - return FOUND_SOURCE; - } else { - return NOT_FOUND; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java deleted file mode 100644 index 880f2e3..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.plan; - -import java.io.File; -import java.io.IOException; - -import org.apache.flink.runtime.jobgraph.JobGraph; - -/** - * Abstract class representing Flink Streaming plans - * - */ -public abstract class StreamingPlan implements FlinkPlan { - - public abstract JobGraph getJobGraph(); - - public abstract String getStreamingPlanAsJSON(); - - public abstract void dumpStreamingPlanAsJSON(File file) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java deleted file mode 100644 index 95adace..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.plan; - -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.common.typeutils.TypeComparatorFactory; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.costs.Costs; -import org.apache.flink.optimizer.dag.OptimizerNode; -import org.apache.flink.optimizer.dag.WorksetIterationNode; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.util.Visitor; - -/** - * A node in the execution, representing a workset iteration (delta iteration). - */ -public class WorksetIterationPlanNode extends DualInputPlanNode implements IterationPlanNode { - - private final SolutionSetPlanNode solutionSetPlanNode; - - private final WorksetPlanNode worksetPlanNode; - - private final PlanNode solutionSetDeltaPlanNode; - - private final PlanNode nextWorkSetPlanNode; - - private TypeSerializerFactory<?> worksetSerializer; - - private TypeSerializerFactory<?> solutionSetSerializer; - - private TypeComparatorFactory<?> solutionSetComparator; - - private boolean immediateSolutionSetUpdate; - - public Object postPassHelper; - - private TypeSerializerFactory<?> serializerForIterationChannel; - - // -------------------------------------------------------------------------------------------- - - public WorksetIterationPlanNode(WorksetIterationNode template, String nodeName, Channel initialSolutionSet, Channel initialWorkset, - SolutionSetPlanNode solutionSetPlanNode, WorksetPlanNode worksetPlanNode, - PlanNode nextWorkSetPlanNode, PlanNode solutionSetDeltaPlanNode) - { - super(template, nodeName, initialSolutionSet, initialWorkset, DriverStrategy.BINARY_NO_OP); - this.solutionSetPlanNode = solutionSetPlanNode; - this.worksetPlanNode = worksetPlanNode; - this.solutionSetDeltaPlanNode = solutionSetDeltaPlanNode; - this.nextWorkSetPlanNode = nextWorkSetPlanNode; - - mergeBranchPlanMaps(); - - } - - // -------------------------------------------------------------------------------------------- - - public WorksetIterationNode getIterationNode() { - if (this.template instanceof WorksetIterationNode) { - return (WorksetIterationNode) this.template; - } else { - throw new RuntimeException(); - } - } - - public SolutionSetPlanNode getSolutionSetPlanNode() { - return this.solutionSetPlanNode; - } - - public WorksetPlanNode getWorksetPlanNode() { - return this.worksetPlanNode; - } - - public PlanNode getSolutionSetDeltaPlanNode() { - return this.solutionSetDeltaPlanNode; - } - - public PlanNode getNextWorkSetPlanNode() { - return this.nextWorkSetPlanNode; - } - - public Channel getInitialSolutionSetInput() { - return getInput1(); - } - - public Channel getInitialWorksetInput() { - return getInput2(); - } - - public void setImmediateSolutionSetUpdate(boolean immediateUpdate) { - this.immediateSolutionSetUpdate = immediateUpdate; - } - - public boolean isImmediateSolutionSetUpdate() { - return this.immediateSolutionSetUpdate; - } - - public FieldList getSolutionSetKeyFields() { - return getIterationNode().getSolutionSetKeyFields(); - } - - // -------------------------------------------------------------------------------------------- - - public TypeSerializerFactory<?> getWorksetSerializer() { - return worksetSerializer; - } - - public void setWorksetSerializer(TypeSerializerFactory<?> worksetSerializer) { - this.worksetSerializer = worksetSerializer; - } - - public TypeSerializerFactory<?> getSolutionSetSerializer() { - return solutionSetSerializer; - } - - public void setSolutionSetSerializer(TypeSerializerFactory<?> solutionSetSerializer) { - this.solutionSetSerializer = solutionSetSerializer; - } - - public TypeComparatorFactory<?> getSolutionSetComparator() { - return solutionSetComparator; - } - - public void setSolutionSetComparator(TypeComparatorFactory<?> solutionSetComparator) { - this.solutionSetComparator = solutionSetComparator; - } - - // -------------------------------------------------------------------------------------------- - - public void setCosts(Costs nodeCosts) { - // add the costs from the step function - nodeCosts.addCosts(this.solutionSetDeltaPlanNode.getCumulativeCostsShare()); - nodeCosts.addCosts(this.nextWorkSetPlanNode.getCumulativeCostsShare()); - - super.setCosts(nodeCosts); - } - - public int getMemoryConsumerWeight() { - // solution set index and workset back channel - return 2; - } - - @Override - public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { - if (source == this) { - return FOUND_SOURCE; - } - - SourceAndDamReport fromOutside = super.hasDamOnPathDownTo(source); - - if (fromOutside == FOUND_SOURCE_AND_DAM) { - return FOUND_SOURCE_AND_DAM; - } - else if (fromOutside == FOUND_SOURCE) { - // we always have a dam in the solution set index - return FOUND_SOURCE_AND_DAM; - } else { - SourceAndDamReport fromNextWorkset = nextWorkSetPlanNode.hasDamOnPathDownTo(source); - - if (fromNextWorkset == FOUND_SOURCE_AND_DAM){ - return FOUND_SOURCE_AND_DAM; - } else if (fromNextWorkset == FOUND_SOURCE){ - return FOUND_SOURCE_AND_DAM; - } else { - return this.solutionSetDeltaPlanNode.hasDamOnPathDownTo(source); - } - } - } - - @Override - public void acceptForStepFunction(Visitor<PlanNode> visitor) { - this.solutionSetDeltaPlanNode.accept(visitor); - this.nextWorkSetPlanNode.accept(visitor); - } - - /** - * Merging can only take place after the solutionSetDelta and nextWorkset PlanNode has been set, - * because they can contain also some of the branching nodes. - */ - @Override - protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode,PlanNode> branchPlan2) {} - - - protected void mergeBranchPlanMaps() { - Map<OptimizerNode, PlanNode> branchPlan1 = input1.getSource().branchPlan; - Map<OptimizerNode, PlanNode> branchPlan2 = input2.getSource().branchPlan; - - // merge the branchPlan maps according the template's uncloseBranchesStack - if (this.template.hasUnclosedBranches()) { - if (this.branchPlan == null) { - this.branchPlan = new HashMap<OptimizerNode, PlanNode>(8); - } - - for (OptimizerNode.UnclosedBranchDescriptor uc : this.template.getOpenBranches()) { - OptimizerNode brancher = uc.getBranchingNode(); - PlanNode selectedCandidate = null; - - if (branchPlan1 != null) { - // predecessor 1 has branching children, see if it got the branch we are looking for - selectedCandidate = branchPlan1.get(brancher); - } - - if (selectedCandidate == null && branchPlan2 != null) { - // predecessor 2 has branching children, see if it got the branch we are looking for - selectedCandidate = branchPlan2.get(brancher); - } - - if(selectedCandidate == null && getSolutionSetDeltaPlanNode() != null && getSolutionSetDeltaPlanNode() - .branchPlan != null){ - selectedCandidate = getSolutionSetDeltaPlanNode().branchPlan.get(brancher); - } - - if(selectedCandidate == null && getNextWorkSetPlanNode() != null && getNextWorkSetPlanNode() - .branchPlan != null){ - selectedCandidate = getNextWorkSetPlanNode().branchPlan.get(brancher); - } - - if (selectedCandidate == null) { - throw new CompilerException( - "Candidates for a node with open branches are missing information about the selected candidate "); - } - - this.branchPlan.put(brancher, selectedCandidate); - } - } - } - - // -------------------------------------------------------------------------------------------- - - public TypeSerializerFactory<?> getSerializerForIterationChannel() { - return serializerForIterationChannel; - } - - public void setSerializerForIterationChannel(TypeSerializerFactory<?> serializerForIterationChannel) { - this.serializerForIterationChannel = serializerForIterationChannel; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java deleted file mode 100644 index 8d044d6..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.plan; - -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; - -import java.util.Collections; -import java.util.HashMap; - -import org.apache.flink.optimizer.costs.Costs; -import org.apache.flink.optimizer.dag.OptimizerNode; -import org.apache.flink.optimizer.dag.WorksetNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.runtime.operators.DamBehavior; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.util.Visitor; - -/** - * Plan candidate node for partial solution of a bulk iteration. - */ -public class WorksetPlanNode extends PlanNode { - - private static final Costs NO_COSTS = new Costs(); - - private WorksetIterationPlanNode containingIterationNode; - - private final Channel initialInput; - - public Object postPassHelper; - - - public WorksetPlanNode(WorksetNode template, String nodeName, - GlobalProperties gProps, LocalProperties lProps, - Channel initialInput) - { - super(template, nodeName, DriverStrategy.NONE); - - this.globalProps = gProps; - this.localProps = lProps; - this.initialInput = initialInput; - - // the node incurs no cost - this.nodeCosts = NO_COSTS; - this.cumulativeCosts = NO_COSTS; - - if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) { - if (this.branchPlan == null) { - this.branchPlan = new HashMap<OptimizerNode, PlanNode>(); - } - - this.branchPlan.putAll(initialInput.getSource().branchPlan); - } - } - - // -------------------------------------------------------------------------------------------- - - public WorksetNode getWorksetNode() { - return (WorksetNode) this.template; - } - - public WorksetIterationPlanNode getContainingIterationNode() { - return this.containingIterationNode; - } - - public void setContainingIterationNode(WorksetIterationPlanNode containingIterationNode) { - this.containingIterationNode = containingIterationNode; - } - - // -------------------------------------------------------------------------------------------- - - - @Override - public void accept(Visitor<PlanNode> visitor) { - if (visitor.preVisit(this)) { - visitor.postVisit(this); - } - } - - - @Override - public Iterable<PlanNode> getPredecessors() { - return Collections.<PlanNode>emptyList(); - } - - - @Override - public Iterable<Channel> getInputs() { - return Collections.<Channel>emptyList(); - } - - - @Override - public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { - if (source == this) { - return FOUND_SOURCE; - } - SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source); - if (res == FOUND_SOURCE_AND_DAM) { - return FOUND_SOURCE_AND_DAM; - } - else if (res == FOUND_SOURCE) { - return (this.initialInput.getLocalStrategy().dams() || - this.initialInput.getTempMode().breaksPipeline() || - getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ? - FOUND_SOURCE_AND_DAM : FOUND_SOURCE; - } - else { - return NOT_FOUND; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java deleted file mode 100644 index 3f8cb46..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.plandump; - -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; - - -/** - * - */ -public interface DumpableConnection<T extends DumpableNode<T>> { - - public DumpableNode<T> getSource(); - - public ShipStrategyType getShipStrategy(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java deleted file mode 100644 index 1bc0f0c..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.plandump; - -import org.apache.flink.optimizer.dag.OptimizerNode; -import org.apache.flink.optimizer.plan.PlanNode; - -/** - * - */ -public interface DumpableNode<T extends DumpableNode<T>> { - - /** - * Gets an iterator over the predecessors. - * - * @return An iterator over the predecessors. - */ - Iterable<T> getPredecessors(); - - Iterable<DumpableConnection<T>> getDumpableInputs(); - - OptimizerNode getOptimizerNode(); - - PlanNode getPlanNode(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java deleted file mode 100644 index 6f918c0..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java +++ /dev/null @@ -1,657 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.plandump; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; -import java.util.Map; - -import org.apache.commons.lang3.StringEscapeUtils; -import org.apache.flink.api.common.operators.CompilerHints; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.dag.BinaryUnionNode; -import org.apache.flink.optimizer.dag.BulkIterationNode; -import org.apache.flink.optimizer.dag.DataSinkNode; -import org.apache.flink.optimizer.dag.DataSourceNode; -import org.apache.flink.optimizer.dag.OptimizerNode; -import org.apache.flink.optimizer.dag.DagConnection; -import org.apache.flink.optimizer.dag.TempMode; -import org.apache.flink.optimizer.dag.WorksetIterationNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.plan.BulkIterationPlanNode; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.PlanNode; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; -import org.apache.flink.optimizer.util.Utils; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.util.StringUtils; - -/** - * - */ -public class PlanJSONDumpGenerator { - - private Map<DumpableNode<?>, Integer> nodeIds; // resolves pact nodes to ids - - private int nodeCnt; - - private boolean encodeForHTML; - - // -------------------------------------------------------------------------------------------- - - public void setEncodeForHTML(boolean encodeForHTML) { - this.encodeForHTML = encodeForHTML; - } - - public boolean isEncodeForHTML() { - return encodeForHTML; - } - - - public void dumpPactPlanAsJSON(List<DataSinkNode> nodes, PrintWriter writer) { - @SuppressWarnings("unchecked") - List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>) nodes; - compilePlanToJSON(n, writer); - } - - public String getPactPlanAsJSON(List<DataSinkNode> nodes) { - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - dumpPactPlanAsJSON(nodes, pw); - return sw.toString(); - } - - public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, File toFile) throws IOException { - PrintWriter pw = null; - try { - pw = new PrintWriter(new FileOutputStream(toFile), false); - dumpOptimizerPlanAsJSON(plan, pw); - pw.flush(); - } finally { - if (pw != null) { - pw.close(); - } - } - } - - public String getOptimizerPlanAsJSON(OptimizedPlan plan) { - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - dumpOptimizerPlanAsJSON(plan, pw); - pw.close(); - return sw.toString(); - } - - public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, PrintWriter writer) { - Collection<SinkPlanNode> sinks = plan.getDataSinks(); - if (sinks instanceof List) { - dumpOptimizerPlanAsJSON((List<SinkPlanNode>) sinks, writer); - } else { - List<SinkPlanNode> n = new ArrayList<SinkPlanNode>(); - n.addAll(sinks); - dumpOptimizerPlanAsJSON(n, writer); - } - } - - public void dumpOptimizerPlanAsJSON(List<SinkPlanNode> nodes, PrintWriter writer) { - @SuppressWarnings("unchecked") - List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>) nodes; - compilePlanToJSON(n, writer); - } - - // -------------------------------------------------------------------------------------------- - - private void compilePlanToJSON(List<DumpableNode<?>> nodes, PrintWriter writer) { - // initialization to assign node ids - this.nodeIds = new HashMap<DumpableNode<?>, Integer>(); - this.nodeCnt = 0; - - // JSON header - writer.print("{\n\t\"nodes\": [\n\n"); - - // Generate JSON for plan - for (int i = 0; i < nodes.size(); i++) { - visit(nodes.get(i), writer, i == 0); - } - - // JSON Footer - writer.println("\n\t]\n}"); - } - - private boolean visit(DumpableNode<?> node, PrintWriter writer, boolean first) { - // check for duplicate traversal - if (this.nodeIds.containsKey(node)) { - return false; - } - - // assign an id first - this.nodeIds.put(node, this.nodeCnt++); - - // then recurse - for (DumpableNode<?> child : node.getPredecessors()) { - //This is important, because when the node was already in the graph it is not allowed - //to set first to false! - if (visit(child, writer, first)) { - first = false; - }; - } - - // check if this node should be skipped from the dump - final OptimizerNode n = node.getOptimizerNode(); - - // ------------------ dump after the ascend --------------------- - // start a new node and output node id - if (!first) { - writer.print(",\n"); - } - // open the node - writer.print("\t{\n"); - - // recurse, it is is an iteration node - if (node instanceof BulkIterationNode || node instanceof BulkIterationPlanNode) { - - DumpableNode<?> innerChild = node instanceof BulkIterationNode ? - ((BulkIterationNode) node).getNextPartialSolution() : - ((BulkIterationPlanNode) node).getRootOfStepFunction(); - - DumpableNode<?> begin = node instanceof BulkIterationNode ? - ((BulkIterationNode) node).getPartialSolution() : - ((BulkIterationPlanNode) node).getPartialSolutionPlanNode(); - - writer.print("\t\t\"step_function\": [\n"); - - visit(innerChild, writer, true); - - writer.print("\n\t\t],\n"); - writer.print("\t\t\"partial_solution\": " + this.nodeIds.get(begin) + ",\n"); - writer.print("\t\t\"next_partial_solution\": " + this.nodeIds.get(innerChild) + ",\n"); - } else if (node instanceof WorksetIterationNode || node instanceof WorksetIterationPlanNode) { - - DumpableNode<?> worksetRoot = node instanceof WorksetIterationNode ? - ((WorksetIterationNode) node).getNextWorkset() : - ((WorksetIterationPlanNode) node).getNextWorkSetPlanNode(); - DumpableNode<?> solutionDelta = node instanceof WorksetIterationNode ? - ((WorksetIterationNode) node).getSolutionSetDelta() : - ((WorksetIterationPlanNode) node).getSolutionSetDeltaPlanNode(); - - DumpableNode<?> workset = node instanceof WorksetIterationNode ? - ((WorksetIterationNode) node).getWorksetNode() : - ((WorksetIterationPlanNode) node).getWorksetPlanNode(); - DumpableNode<?> solutionSet = node instanceof WorksetIterationNode ? - ((WorksetIterationNode) node).getSolutionSetNode() : - ((WorksetIterationPlanNode) node).getSolutionSetPlanNode(); - - writer.print("\t\t\"step_function\": [\n"); - - visit(worksetRoot, writer, true); - visit(solutionDelta, writer, false); - - writer.print("\n\t\t],\n"); - writer.print("\t\t\"workset\": " + this.nodeIds.get(workset) + ",\n"); - writer.print("\t\t\"solution_set\": " + this.nodeIds.get(solutionSet) + ",\n"); - writer.print("\t\t\"next_workset\": " + this.nodeIds.get(worksetRoot) + ",\n"); - writer.print("\t\t\"solution_delta\": " + this.nodeIds.get(solutionDelta) + ",\n"); - } - - // print the id - writer.print("\t\t\"id\": " + this.nodeIds.get(node)); - - - final String type; - String contents; - if (n instanceof DataSinkNode) { - type = "sink"; - contents = n.getOperator().toString(); - } else if (n instanceof DataSourceNode) { - type = "source"; - contents = n.getOperator().toString(); - } - else if (n instanceof BulkIterationNode) { - type = "bulk_iteration"; - contents = n.getOperator().getName(); - } - else if (n instanceof WorksetIterationNode) { - type = "workset_iteration"; - contents = n.getOperator().getName(); - } - else if (n instanceof BinaryUnionNode) { - type = "pact"; - contents = ""; - } - else { - type = "pact"; - contents = n.getOperator().getName(); - } - - contents = StringUtils.showControlCharacters(contents); - if (encodeForHTML) { - contents = StringEscapeUtils.escapeHtml4(contents); - contents = contents.replace("\\", "\"); - } - - - String name = n.getName(); - if (name.equals("Reduce") && (node instanceof SingleInputPlanNode) && - ((SingleInputPlanNode) node).getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE) { - name = "Combine"; - } - - // output the type identifier - writer.print(",\n\t\t\"type\": \"" + type + "\""); - - // output node name - writer.print(",\n\t\t\"pact\": \"" + name + "\""); - - // output node contents - writer.print(",\n\t\t\"contents\": \"" + contents + "\""); - - // degree of parallelism - writer.print(",\n\t\t\"parallelism\": \"" - + (n.getParallelism() >= 1 ? n.getParallelism() : "default") + "\""); - - // output node predecessors - Iterator<? extends DumpableConnection<?>> inConns = node.getDumpableInputs().iterator(); - String child1name = "", child2name = ""; - - if (inConns != null && inConns.hasNext()) { - // start predecessor list - writer.print(",\n\t\t\"predecessors\": ["); - int inputNum = 0; - - while (inConns.hasNext()) { - final DumpableConnection<?> inConn = inConns.next(); - final DumpableNode<?> source = inConn.getSource(); - writer.print(inputNum == 0 ? "\n" : ",\n"); - if (inputNum == 0) { - child1name += child1name.length() > 0 ? ", " : ""; - child1name += source.getOptimizerNode().getOperator().getName(); - } else if (inputNum == 1) { - child2name += child2name.length() > 0 ? ", " : ""; - child2name = source.getOptimizerNode().getOperator().getName(); - } - - // output predecessor id - writer.print("\t\t\t{\"id\": " + this.nodeIds.get(source)); - - // output connection side - if (inConns.hasNext() || inputNum > 0) { - writer.print(", \"side\": \"" + (inputNum == 0 ? "first" : "second") + "\""); - } - // output shipping strategy and channel type - final Channel channel = (inConn instanceof Channel) ? (Channel) inConn : null; - final ShipStrategyType shipType = channel != null ? channel.getShipStrategy() : - ((DagConnection) inConn).getShipStrategy(); - - String shipStrategy = null; - if (shipType != null) { - switch (shipType) { - case NONE: - // nothing - break; - case FORWARD: - shipStrategy = "Forward"; - break; - case BROADCAST: - shipStrategy = "Broadcast"; - break; - case PARTITION_HASH: - shipStrategy = "Hash Partition"; - break; - case PARTITION_RANGE: - shipStrategy = "Range Partition"; - break; - case PARTITION_RANDOM: - shipStrategy = "Redistribute"; - break; - case PARTITION_FORCED_REBALANCE: - shipStrategy = "Rebalance"; - break; - case PARTITION_CUSTOM: - shipStrategy = "Custom Partition"; - break; - default: - throw new CompilerException("Unknown ship strategy '" + inConn.getShipStrategy().name() - + "' in JSON generator."); - } - } - - if (channel != null && channel.getShipStrategyKeys() != null && channel.getShipStrategyKeys().size() > 0) { - shipStrategy += " on " + (channel.getShipStrategySortOrder() == null ? - channel.getShipStrategyKeys().toString() : - Utils.createOrdering(channel.getShipStrategyKeys(), channel.getShipStrategySortOrder()).toString()); - } - - if (shipStrategy != null) { - writer.print(", \"ship_strategy\": \"" + shipStrategy + "\""); - } - - if (channel != null) { - String localStrategy = null; - switch (channel.getLocalStrategy()) { - case NONE: - break; - case SORT: - localStrategy = "Sort"; - break; - case COMBININGSORT: - localStrategy = "Sort (combining)"; - break; - default: - throw new CompilerException("Unknown local strategy " + channel.getLocalStrategy().name()); - } - - if (channel != null && channel.getLocalStrategyKeys() != null && channel.getLocalStrategyKeys().size() > 0) { - localStrategy += " on " + (channel.getLocalStrategySortOrder() == null ? - channel.getLocalStrategyKeys().toString() : - Utils.createOrdering(channel.getLocalStrategyKeys(), channel.getLocalStrategySortOrder()).toString()); - } - - if (localStrategy != null) { - writer.print(", \"local_strategy\": \"" + localStrategy + "\""); - } - - if (channel != null && channel.getTempMode() != TempMode.NONE) { - String tempMode = channel.getTempMode().toString(); - writer.print(", \"temp_mode\": \"" + tempMode + "\""); - } - } - - writer.print('}'); - inputNum++; - } - // finish predecessors - writer.print("\n\t\t]"); - } - - //--------------------------------------------------------------------------------------- - // the part below here is relevant only to plan nodes with concrete strategies, etc - //--------------------------------------------------------------------------------------- - - final PlanNode p = node.getPlanNode(); - if (p == null) { - // finish node - writer.print("\n\t}"); - return true; - } - // local strategy - String locString = null; - if (p.getDriverStrategy() != null) { - switch (p.getDriverStrategy()) { - case NONE: - case BINARY_NO_OP: - break; - - case UNARY_NO_OP: - locString = "No-Op"; - break; - - case COLLECTOR_MAP: - case MAP: - locString = "Map"; - break; - - case FLAT_MAP: - locString = "FlatMap"; - break; - - case MAP_PARTITION: - locString = "Map Partition"; - break; - - case ALL_REDUCE: - locString = "Reduce All"; - break; - - case ALL_GROUP_REDUCE: - case ALL_GROUP_REDUCE_COMBINE: - locString = "Group Reduce All"; - break; - - case SORTED_REDUCE: - locString = "Sorted Reduce"; - break; - - case SORTED_PARTIAL_REDUCE: - locString = "Sorted Combine/Reduce"; - break; - - case SORTED_GROUP_REDUCE: - locString = "Sorted Group Reduce"; - break; - - case SORTED_GROUP_COMBINE: - locString = "Sorted Combine"; - break; - - case HYBRIDHASH_BUILD_FIRST: - locString = "Hybrid Hash (build: " + child1name + ")"; - break; - case HYBRIDHASH_BUILD_SECOND: - locString = "Hybrid Hash (build: " + child2name + ")"; - break; - - case HYBRIDHASH_BUILD_FIRST_CACHED: - locString = "Hybrid Hash (CACHED) (build: " + child1name + ")"; - break; - case HYBRIDHASH_BUILD_SECOND_CACHED: - locString = "Hybrid Hash (CACHED) (build: " + child2name + ")"; - break; - - case NESTEDLOOP_BLOCKED_OUTER_FIRST: - locString = "Nested Loops (Blocked Outer: " + child1name + ")"; - break; - case NESTEDLOOP_BLOCKED_OUTER_SECOND: - locString = "Nested Loops (Blocked Outer: " + child2name + ")"; - break; - case NESTEDLOOP_STREAMED_OUTER_FIRST: - locString = "Nested Loops (Streamed Outer: " + child1name + ")"; - break; - case NESTEDLOOP_STREAMED_OUTER_SECOND: - locString = "Nested Loops (Streamed Outer: " + child2name + ")"; - break; - - case MERGE: - locString = "Merge"; - break; - - case CO_GROUP: - locString = "Co-Group"; - break; - - default: - locString = p.getDriverStrategy().name(); - break; - } - - if (locString != null) { - writer.print(",\n\t\t\"driver_strategy\": \""); - writer.print(locString); - writer.print("\""); - } - } - - { - // output node global properties - final GlobalProperties gp = p.getGlobalProperties(); - - writer.print(",\n\t\t\"global_properties\": [\n"); - - addProperty(writer, "Partitioning", gp.getPartitioning().name(), true); - if (gp.getPartitioningFields() != null) { - addProperty(writer, "Partitioned on", gp.getPartitioningFields().toString(), false); - } - if (gp.getPartitioningOrdering() != null) { - addProperty(writer, "Partitioning Order", gp.getPartitioningOrdering().toString(), false); - } - else { - addProperty(writer, "Partitioning Order", "(none)", false); - } - if (n.getUniqueFields() == null || n.getUniqueFields().size() == 0) { - addProperty(writer, "Uniqueness", "not unique", false); - } - else { - addProperty(writer, "Uniqueness", n.getUniqueFields().toString(), false); - } - - writer.print("\n\t\t]"); - } - - { - // output node local properties - LocalProperties lp = p.getLocalProperties(); - - writer.print(",\n\t\t\"local_properties\": [\n"); - - if (lp.getOrdering() != null) { - addProperty(writer, "Order", lp.getOrdering().toString(), true); - } - else { - addProperty(writer, "Order", "(none)", true); - } - if (lp.getGroupedFields() != null && lp.getGroupedFields().size() > 0) { - addProperty(writer, "Grouped on", lp.getGroupedFields().toString(), false); - } else { - addProperty(writer, "Grouping", "not grouped", false); - } - if (n.getUniqueFields() == null || n.getUniqueFields().size() == 0) { - addProperty(writer, "Uniqueness", "not unique", false); - } - else { - addProperty(writer, "Uniqueness", n.getUniqueFields().toString(), false); - } - - writer.print("\n\t\t]"); - } - - // output node size estimates - writer.print(",\n\t\t\"estimates\": [\n"); - - addProperty(writer, "Est. Output Size", n.getEstimatedOutputSize() == -1 ? "(unknown)" - : formatNumber(n.getEstimatedOutputSize(), "B"), true); - addProperty(writer, "Est. Cardinality", n.getEstimatedNumRecords() == -1 ? "(unknown)" - : formatNumber(n.getEstimatedNumRecords()), false); - - writer.print("\t\t]"); - - // output node cost - if (p.getNodeCosts() != null) { - writer.print(",\n\t\t\"costs\": [\n"); - - addProperty(writer, "Network", p.getNodeCosts().getNetworkCost() == -1 ? "(unknown)" - : formatNumber(p.getNodeCosts().getNetworkCost(), "B"), true); - addProperty(writer, "Disk I/O", p.getNodeCosts().getDiskCost() == -1 ? "(unknown)" - : formatNumber(p.getNodeCosts().getDiskCost(), "B"), false); - addProperty(writer, "CPU", p.getNodeCosts().getCpuCost() == -1 ? "(unknown)" - : formatNumber(p.getNodeCosts().getCpuCost(), ""), false); - - addProperty(writer, "Cumulative Network", - p.getCumulativeCosts().getNetworkCost() == -1 ? "(unknown)" : formatNumber(p - .getCumulativeCosts().getNetworkCost(), "B"), false); - addProperty(writer, "Cumulative Disk I/O", - p.getCumulativeCosts().getDiskCost() == -1 ? "(unknown)" : formatNumber(p - .getCumulativeCosts().getDiskCost(), "B"), false); - addProperty(writer, "Cumulative CPU", - p.getCumulativeCosts().getCpuCost() == -1 ? "(unknown)" : formatNumber(p - .getCumulativeCosts().getCpuCost(), ""), false); - - writer.print("\n\t\t]"); - } - - // output the node compiler hints - if (n.getOperator().getCompilerHints() != null) { - CompilerHints hints = n.getOperator().getCompilerHints(); - CompilerHints defaults = new CompilerHints(); - - String size = hints.getOutputSize() == defaults.getOutputSize() ? "(none)" : String.valueOf(hints.getOutputSize()); - String card = hints.getOutputCardinality() == defaults.getOutputCardinality() ? "(none)" : String.valueOf(hints.getOutputCardinality()); - String width = hints.getAvgOutputRecordSize() == defaults.getAvgOutputRecordSize() ? "(none)" : String.valueOf(hints.getAvgOutputRecordSize()); - String filter = hints.getFilterFactor() == defaults.getFilterFactor() ? "(none)" : String.valueOf(hints.getFilterFactor()); - - writer.print(",\n\t\t\"compiler_hints\": [\n"); - - addProperty(writer, "Output Size (bytes)", size, true); - addProperty(writer, "Output Cardinality", card, false); - addProperty(writer, "Avg. Output Record Size (bytes)", width, false); - addProperty(writer, "Filter Factor", filter, false); - - writer.print("\t\t]"); - } - - // finish node - writer.print("\n\t}"); - return true; - } - - private void addProperty(PrintWriter writer, String name, String value, boolean first) { - if (!first) { - writer.print(",\n"); - } - writer.print("\t\t\t{ \"name\": \""); - writer.print(name); - writer.print("\", \"value\": \""); - writer.print(value); - writer.print("\" }"); - } - - public static final String formatNumber(double number) { - return formatNumber(number, ""); - } - - public static final String formatNumber(double number, String suffix) { - if (number <= 0.0) { - return String.valueOf(number); - } - - int power = (int) Math.ceil(Math.log10(number)); - - int group = (power - 1) / 3; - if (group >= SIZE_SUFFIXES.length) { - group = SIZE_SUFFIXES.length - 1; - } else if (group < 0) { - group = 0; - } - - // truncate fractional part - int beforeDecimal = power - group * 3; - if (power > beforeDecimal) { - for (int i = power - beforeDecimal; i > 0; i--) { - number /= 10; - } - } - - return group > 0 ? String.format(Locale.US, "%.2f %s", number, SIZE_SUFFIXES[group]) : - String.format(Locale.US, "%.2f", number); - } - - private static final char[] SIZE_SUFFIXES = { 0, 'K', 'M', 'G', 'T' }; -}