http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java new file mode 100644 index 0000000..95adace --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.plan; + +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.typeutils.TypeComparatorFactory; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.costs.Costs; +import org.apache.flink.optimizer.dag.OptimizerNode; +import org.apache.flink.optimizer.dag.WorksetIterationNode; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.util.Visitor; + +/** + * A node in the execution, representing a workset iteration (delta iteration). + */ +public class WorksetIterationPlanNode extends DualInputPlanNode implements IterationPlanNode { + + private final SolutionSetPlanNode solutionSetPlanNode; + + private final WorksetPlanNode worksetPlanNode; + + private final PlanNode solutionSetDeltaPlanNode; + + private final PlanNode nextWorkSetPlanNode; + + private TypeSerializerFactory<?> worksetSerializer; + + private TypeSerializerFactory<?> solutionSetSerializer; + + private TypeComparatorFactory<?> solutionSetComparator; + + private boolean immediateSolutionSetUpdate; + + public Object postPassHelper; + + private TypeSerializerFactory<?> serializerForIterationChannel; + + // -------------------------------------------------------------------------------------------- + + public WorksetIterationPlanNode(WorksetIterationNode template, String nodeName, Channel initialSolutionSet, Channel initialWorkset, + SolutionSetPlanNode solutionSetPlanNode, WorksetPlanNode worksetPlanNode, + PlanNode nextWorkSetPlanNode, PlanNode solutionSetDeltaPlanNode) + { + super(template, nodeName, initialSolutionSet, initialWorkset, DriverStrategy.BINARY_NO_OP); + this.solutionSetPlanNode = solutionSetPlanNode; + this.worksetPlanNode = worksetPlanNode; + this.solutionSetDeltaPlanNode = solutionSetDeltaPlanNode; + this.nextWorkSetPlanNode = nextWorkSetPlanNode; + + mergeBranchPlanMaps(); + + } + + // -------------------------------------------------------------------------------------------- + + public WorksetIterationNode getIterationNode() { + if (this.template instanceof WorksetIterationNode) { + return (WorksetIterationNode) this.template; + } else { + throw new RuntimeException(); + } + } + + public SolutionSetPlanNode getSolutionSetPlanNode() { + return this.solutionSetPlanNode; + } + + public WorksetPlanNode getWorksetPlanNode() { + return this.worksetPlanNode; + } + + public PlanNode getSolutionSetDeltaPlanNode() { + return this.solutionSetDeltaPlanNode; + } + + public PlanNode getNextWorkSetPlanNode() { + return this.nextWorkSetPlanNode; + } + + public Channel getInitialSolutionSetInput() { + return getInput1(); + } + + public Channel getInitialWorksetInput() { + return getInput2(); + } + + public void setImmediateSolutionSetUpdate(boolean immediateUpdate) { + this.immediateSolutionSetUpdate = immediateUpdate; + } + + public boolean isImmediateSolutionSetUpdate() { + return this.immediateSolutionSetUpdate; + } + + public FieldList getSolutionSetKeyFields() { + return getIterationNode().getSolutionSetKeyFields(); + } + + // -------------------------------------------------------------------------------------------- + + public TypeSerializerFactory<?> getWorksetSerializer() { + return worksetSerializer; + } + + public void setWorksetSerializer(TypeSerializerFactory<?> worksetSerializer) { + this.worksetSerializer = worksetSerializer; + } + + public TypeSerializerFactory<?> getSolutionSetSerializer() { + return solutionSetSerializer; + } + + public void setSolutionSetSerializer(TypeSerializerFactory<?> solutionSetSerializer) { + this.solutionSetSerializer = solutionSetSerializer; + } + + public TypeComparatorFactory<?> getSolutionSetComparator() { + return solutionSetComparator; + } + + public void setSolutionSetComparator(TypeComparatorFactory<?> solutionSetComparator) { + this.solutionSetComparator = solutionSetComparator; + } + + // -------------------------------------------------------------------------------------------- + + public void setCosts(Costs nodeCosts) { + // add the costs from the step function + nodeCosts.addCosts(this.solutionSetDeltaPlanNode.getCumulativeCostsShare()); + nodeCosts.addCosts(this.nextWorkSetPlanNode.getCumulativeCostsShare()); + + super.setCosts(nodeCosts); + } + + public int getMemoryConsumerWeight() { + // solution set index and workset back channel + return 2; + } + + @Override + public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { + if (source == this) { + return FOUND_SOURCE; + } + + SourceAndDamReport fromOutside = super.hasDamOnPathDownTo(source); + + if (fromOutside == FOUND_SOURCE_AND_DAM) { + return FOUND_SOURCE_AND_DAM; + } + else if (fromOutside == FOUND_SOURCE) { + // we always have a dam in the solution set index + return FOUND_SOURCE_AND_DAM; + } else { + SourceAndDamReport fromNextWorkset = nextWorkSetPlanNode.hasDamOnPathDownTo(source); + + if (fromNextWorkset == FOUND_SOURCE_AND_DAM){ + return FOUND_SOURCE_AND_DAM; + } else if (fromNextWorkset == FOUND_SOURCE){ + return FOUND_SOURCE_AND_DAM; + } else { + return this.solutionSetDeltaPlanNode.hasDamOnPathDownTo(source); + } + } + } + + @Override + public void acceptForStepFunction(Visitor<PlanNode> visitor) { + this.solutionSetDeltaPlanNode.accept(visitor); + this.nextWorkSetPlanNode.accept(visitor); + } + + /** + * Merging can only take place after the solutionSetDelta and nextWorkset PlanNode has been set, + * because they can contain also some of the branching nodes. + */ + @Override + protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode,PlanNode> branchPlan2) {} + + + protected void mergeBranchPlanMaps() { + Map<OptimizerNode, PlanNode> branchPlan1 = input1.getSource().branchPlan; + Map<OptimizerNode, PlanNode> branchPlan2 = input2.getSource().branchPlan; + + // merge the branchPlan maps according the template's uncloseBranchesStack + if (this.template.hasUnclosedBranches()) { + if (this.branchPlan == null) { + this.branchPlan = new HashMap<OptimizerNode, PlanNode>(8); + } + + for (OptimizerNode.UnclosedBranchDescriptor uc : this.template.getOpenBranches()) { + OptimizerNode brancher = uc.getBranchingNode(); + PlanNode selectedCandidate = null; + + if (branchPlan1 != null) { + // predecessor 1 has branching children, see if it got the branch we are looking for + selectedCandidate = branchPlan1.get(brancher); + } + + if (selectedCandidate == null && branchPlan2 != null) { + // predecessor 2 has branching children, see if it got the branch we are looking for + selectedCandidate = branchPlan2.get(brancher); + } + + if(selectedCandidate == null && getSolutionSetDeltaPlanNode() != null && getSolutionSetDeltaPlanNode() + .branchPlan != null){ + selectedCandidate = getSolutionSetDeltaPlanNode().branchPlan.get(brancher); + } + + if(selectedCandidate == null && getNextWorkSetPlanNode() != null && getNextWorkSetPlanNode() + .branchPlan != null){ + selectedCandidate = getNextWorkSetPlanNode().branchPlan.get(brancher); + } + + if (selectedCandidate == null) { + throw new CompilerException( + "Candidates for a node with open branches are missing information about the selected candidate "); + } + + this.branchPlan.put(brancher, selectedCandidate); + } + } + } + + // -------------------------------------------------------------------------------------------- + + public TypeSerializerFactory<?> getSerializerForIterationChannel() { + return serializerForIterationChannel; + } + + public void setSerializerForIterationChannel(TypeSerializerFactory<?> serializerForIterationChannel) { + this.serializerForIterationChannel = serializerForIterationChannel; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java new file mode 100644 index 0000000..8d044d6 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.plan; + +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; + +import java.util.Collections; +import java.util.HashMap; + +import org.apache.flink.optimizer.costs.Costs; +import org.apache.flink.optimizer.dag.OptimizerNode; +import org.apache.flink.optimizer.dag.WorksetNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.runtime.operators.DamBehavior; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.util.Visitor; + +/** + * Plan candidate node for partial solution of a bulk iteration. + */ +public class WorksetPlanNode extends PlanNode { + + private static final Costs NO_COSTS = new Costs(); + + private WorksetIterationPlanNode containingIterationNode; + + private final Channel initialInput; + + public Object postPassHelper; + + + public WorksetPlanNode(WorksetNode template, String nodeName, + GlobalProperties gProps, LocalProperties lProps, + Channel initialInput) + { + super(template, nodeName, DriverStrategy.NONE); + + this.globalProps = gProps; + this.localProps = lProps; + this.initialInput = initialInput; + + // the node incurs no cost + this.nodeCosts = NO_COSTS; + this.cumulativeCosts = NO_COSTS; + + if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) { + if (this.branchPlan == null) { + this.branchPlan = new HashMap<OptimizerNode, PlanNode>(); + } + + this.branchPlan.putAll(initialInput.getSource().branchPlan); + } + } + + // -------------------------------------------------------------------------------------------- + + public WorksetNode getWorksetNode() { + return (WorksetNode) this.template; + } + + public WorksetIterationPlanNode getContainingIterationNode() { + return this.containingIterationNode; + } + + public void setContainingIterationNode(WorksetIterationPlanNode containingIterationNode) { + this.containingIterationNode = containingIterationNode; + } + + // -------------------------------------------------------------------------------------------- + + + @Override + public void accept(Visitor<PlanNode> visitor) { + if (visitor.preVisit(this)) { + visitor.postVisit(this); + } + } + + + @Override + public Iterable<PlanNode> getPredecessors() { + return Collections.<PlanNode>emptyList(); + } + + + @Override + public Iterable<Channel> getInputs() { + return Collections.<Channel>emptyList(); + } + + + @Override + public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { + if (source == this) { + return FOUND_SOURCE; + } + SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source); + if (res == FOUND_SOURCE_AND_DAM) { + return FOUND_SOURCE_AND_DAM; + } + else if (res == FOUND_SOURCE) { + return (this.initialInput.getLocalStrategy().dams() || + this.initialInput.getTempMode().breaksPipeline() || + getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ? + FOUND_SOURCE_AND_DAM : FOUND_SOURCE; + } + else { + return NOT_FOUND; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java new file mode 100644 index 0000000..3f8cb46 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.plandump; + +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; + + +/** + * + */ +public interface DumpableConnection<T extends DumpableNode<T>> { + + public DumpableNode<T> getSource(); + + public ShipStrategyType getShipStrategy(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java new file mode 100644 index 0000000..1bc0f0c --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.plandump; + +import org.apache.flink.optimizer.dag.OptimizerNode; +import org.apache.flink.optimizer.plan.PlanNode; + +/** + * + */ +public interface DumpableNode<T extends DumpableNode<T>> { + + /** + * Gets an iterator over the predecessors. + * + * @return An iterator over the predecessors. + */ + Iterable<T> getPredecessors(); + + Iterable<DumpableConnection<T>> getDumpableInputs(); + + OptimizerNode getOptimizerNode(); + + PlanNode getPlanNode(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java new file mode 100644 index 0000000..6f918c0 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java @@ -0,0 +1,657 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.plandump; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.flink.api.common.operators.CompilerHints; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.dag.BinaryUnionNode; +import org.apache.flink.optimizer.dag.BulkIterationNode; +import org.apache.flink.optimizer.dag.DataSinkNode; +import org.apache.flink.optimizer.dag.DataSourceNode; +import org.apache.flink.optimizer.dag.OptimizerNode; +import org.apache.flink.optimizer.dag.DagConnection; +import org.apache.flink.optimizer.dag.TempMode; +import org.apache.flink.optimizer.dag.WorksetIterationNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.plan.BulkIterationPlanNode; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; +import org.apache.flink.optimizer.util.Utils; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.util.StringUtils; + +/** + * + */ +public class PlanJSONDumpGenerator { + + private Map<DumpableNode<?>, Integer> nodeIds; // resolves pact nodes to ids + + private int nodeCnt; + + private boolean encodeForHTML; + + // -------------------------------------------------------------------------------------------- + + public void setEncodeForHTML(boolean encodeForHTML) { + this.encodeForHTML = encodeForHTML; + } + + public boolean isEncodeForHTML() { + return encodeForHTML; + } + + + public void dumpPactPlanAsJSON(List<DataSinkNode> nodes, PrintWriter writer) { + @SuppressWarnings("unchecked") + List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>) nodes; + compilePlanToJSON(n, writer); + } + + public String getPactPlanAsJSON(List<DataSinkNode> nodes) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + dumpPactPlanAsJSON(nodes, pw); + return sw.toString(); + } + + public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, File toFile) throws IOException { + PrintWriter pw = null; + try { + pw = new PrintWriter(new FileOutputStream(toFile), false); + dumpOptimizerPlanAsJSON(plan, pw); + pw.flush(); + } finally { + if (pw != null) { + pw.close(); + } + } + } + + public String getOptimizerPlanAsJSON(OptimizedPlan plan) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + dumpOptimizerPlanAsJSON(plan, pw); + pw.close(); + return sw.toString(); + } + + public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, PrintWriter writer) { + Collection<SinkPlanNode> sinks = plan.getDataSinks(); + if (sinks instanceof List) { + dumpOptimizerPlanAsJSON((List<SinkPlanNode>) sinks, writer); + } else { + List<SinkPlanNode> n = new ArrayList<SinkPlanNode>(); + n.addAll(sinks); + dumpOptimizerPlanAsJSON(n, writer); + } + } + + public void dumpOptimizerPlanAsJSON(List<SinkPlanNode> nodes, PrintWriter writer) { + @SuppressWarnings("unchecked") + List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>) nodes; + compilePlanToJSON(n, writer); + } + + // -------------------------------------------------------------------------------------------- + + private void compilePlanToJSON(List<DumpableNode<?>> nodes, PrintWriter writer) { + // initialization to assign node ids + this.nodeIds = new HashMap<DumpableNode<?>, Integer>(); + this.nodeCnt = 0; + + // JSON header + writer.print("{\n\t\"nodes\": [\n\n"); + + // Generate JSON for plan + for (int i = 0; i < nodes.size(); i++) { + visit(nodes.get(i), writer, i == 0); + } + + // JSON Footer + writer.println("\n\t]\n}"); + } + + private boolean visit(DumpableNode<?> node, PrintWriter writer, boolean first) { + // check for duplicate traversal + if (this.nodeIds.containsKey(node)) { + return false; + } + + // assign an id first + this.nodeIds.put(node, this.nodeCnt++); + + // then recurse + for (DumpableNode<?> child : node.getPredecessors()) { + //This is important, because when the node was already in the graph it is not allowed + //to set first to false! + if (visit(child, writer, first)) { + first = false; + }; + } + + // check if this node should be skipped from the dump + final OptimizerNode n = node.getOptimizerNode(); + + // ------------------ dump after the ascend --------------------- + // start a new node and output node id + if (!first) { + writer.print(",\n"); + } + // open the node + writer.print("\t{\n"); + + // recurse, it is is an iteration node + if (node instanceof BulkIterationNode || node instanceof BulkIterationPlanNode) { + + DumpableNode<?> innerChild = node instanceof BulkIterationNode ? + ((BulkIterationNode) node).getNextPartialSolution() : + ((BulkIterationPlanNode) node).getRootOfStepFunction(); + + DumpableNode<?> begin = node instanceof BulkIterationNode ? + ((BulkIterationNode) node).getPartialSolution() : + ((BulkIterationPlanNode) node).getPartialSolutionPlanNode(); + + writer.print("\t\t\"step_function\": [\n"); + + visit(innerChild, writer, true); + + writer.print("\n\t\t],\n"); + writer.print("\t\t\"partial_solution\": " + this.nodeIds.get(begin) + ",\n"); + writer.print("\t\t\"next_partial_solution\": " + this.nodeIds.get(innerChild) + ",\n"); + } else if (node instanceof WorksetIterationNode || node instanceof WorksetIterationPlanNode) { + + DumpableNode<?> worksetRoot = node instanceof WorksetIterationNode ? + ((WorksetIterationNode) node).getNextWorkset() : + ((WorksetIterationPlanNode) node).getNextWorkSetPlanNode(); + DumpableNode<?> solutionDelta = node instanceof WorksetIterationNode ? + ((WorksetIterationNode) node).getSolutionSetDelta() : + ((WorksetIterationPlanNode) node).getSolutionSetDeltaPlanNode(); + + DumpableNode<?> workset = node instanceof WorksetIterationNode ? + ((WorksetIterationNode) node).getWorksetNode() : + ((WorksetIterationPlanNode) node).getWorksetPlanNode(); + DumpableNode<?> solutionSet = node instanceof WorksetIterationNode ? + ((WorksetIterationNode) node).getSolutionSetNode() : + ((WorksetIterationPlanNode) node).getSolutionSetPlanNode(); + + writer.print("\t\t\"step_function\": [\n"); + + visit(worksetRoot, writer, true); + visit(solutionDelta, writer, false); + + writer.print("\n\t\t],\n"); + writer.print("\t\t\"workset\": " + this.nodeIds.get(workset) + ",\n"); + writer.print("\t\t\"solution_set\": " + this.nodeIds.get(solutionSet) + ",\n"); + writer.print("\t\t\"next_workset\": " + this.nodeIds.get(worksetRoot) + ",\n"); + writer.print("\t\t\"solution_delta\": " + this.nodeIds.get(solutionDelta) + ",\n"); + } + + // print the id + writer.print("\t\t\"id\": " + this.nodeIds.get(node)); + + + final String type; + String contents; + if (n instanceof DataSinkNode) { + type = "sink"; + contents = n.getOperator().toString(); + } else if (n instanceof DataSourceNode) { + type = "source"; + contents = n.getOperator().toString(); + } + else if (n instanceof BulkIterationNode) { + type = "bulk_iteration"; + contents = n.getOperator().getName(); + } + else if (n instanceof WorksetIterationNode) { + type = "workset_iteration"; + contents = n.getOperator().getName(); + } + else if (n instanceof BinaryUnionNode) { + type = "pact"; + contents = ""; + } + else { + type = "pact"; + contents = n.getOperator().getName(); + } + + contents = StringUtils.showControlCharacters(contents); + if (encodeForHTML) { + contents = StringEscapeUtils.escapeHtml4(contents); + contents = contents.replace("\\", "\"); + } + + + String name = n.getName(); + if (name.equals("Reduce") && (node instanceof SingleInputPlanNode) && + ((SingleInputPlanNode) node).getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE) { + name = "Combine"; + } + + // output the type identifier + writer.print(",\n\t\t\"type\": \"" + type + "\""); + + // output node name + writer.print(",\n\t\t\"pact\": \"" + name + "\""); + + // output node contents + writer.print(",\n\t\t\"contents\": \"" + contents + "\""); + + // degree of parallelism + writer.print(",\n\t\t\"parallelism\": \"" + + (n.getParallelism() >= 1 ? n.getParallelism() : "default") + "\""); + + // output node predecessors + Iterator<? extends DumpableConnection<?>> inConns = node.getDumpableInputs().iterator(); + String child1name = "", child2name = ""; + + if (inConns != null && inConns.hasNext()) { + // start predecessor list + writer.print(",\n\t\t\"predecessors\": ["); + int inputNum = 0; + + while (inConns.hasNext()) { + final DumpableConnection<?> inConn = inConns.next(); + final DumpableNode<?> source = inConn.getSource(); + writer.print(inputNum == 0 ? "\n" : ",\n"); + if (inputNum == 0) { + child1name += child1name.length() > 0 ? ", " : ""; + child1name += source.getOptimizerNode().getOperator().getName(); + } else if (inputNum == 1) { + child2name += child2name.length() > 0 ? ", " : ""; + child2name = source.getOptimizerNode().getOperator().getName(); + } + + // output predecessor id + writer.print("\t\t\t{\"id\": " + this.nodeIds.get(source)); + + // output connection side + if (inConns.hasNext() || inputNum > 0) { + writer.print(", \"side\": \"" + (inputNum == 0 ? "first" : "second") + "\""); + } + // output shipping strategy and channel type + final Channel channel = (inConn instanceof Channel) ? (Channel) inConn : null; + final ShipStrategyType shipType = channel != null ? channel.getShipStrategy() : + ((DagConnection) inConn).getShipStrategy(); + + String shipStrategy = null; + if (shipType != null) { + switch (shipType) { + case NONE: + // nothing + break; + case FORWARD: + shipStrategy = "Forward"; + break; + case BROADCAST: + shipStrategy = "Broadcast"; + break; + case PARTITION_HASH: + shipStrategy = "Hash Partition"; + break; + case PARTITION_RANGE: + shipStrategy = "Range Partition"; + break; + case PARTITION_RANDOM: + shipStrategy = "Redistribute"; + break; + case PARTITION_FORCED_REBALANCE: + shipStrategy = "Rebalance"; + break; + case PARTITION_CUSTOM: + shipStrategy = "Custom Partition"; + break; + default: + throw new CompilerException("Unknown ship strategy '" + inConn.getShipStrategy().name() + + "' in JSON generator."); + } + } + + if (channel != null && channel.getShipStrategyKeys() != null && channel.getShipStrategyKeys().size() > 0) { + shipStrategy += " on " + (channel.getShipStrategySortOrder() == null ? + channel.getShipStrategyKeys().toString() : + Utils.createOrdering(channel.getShipStrategyKeys(), channel.getShipStrategySortOrder()).toString()); + } + + if (shipStrategy != null) { + writer.print(", \"ship_strategy\": \"" + shipStrategy + "\""); + } + + if (channel != null) { + String localStrategy = null; + switch (channel.getLocalStrategy()) { + case NONE: + break; + case SORT: + localStrategy = "Sort"; + break; + case COMBININGSORT: + localStrategy = "Sort (combining)"; + break; + default: + throw new CompilerException("Unknown local strategy " + channel.getLocalStrategy().name()); + } + + if (channel != null && channel.getLocalStrategyKeys() != null && channel.getLocalStrategyKeys().size() > 0) { + localStrategy += " on " + (channel.getLocalStrategySortOrder() == null ? + channel.getLocalStrategyKeys().toString() : + Utils.createOrdering(channel.getLocalStrategyKeys(), channel.getLocalStrategySortOrder()).toString()); + } + + if (localStrategy != null) { + writer.print(", \"local_strategy\": \"" + localStrategy + "\""); + } + + if (channel != null && channel.getTempMode() != TempMode.NONE) { + String tempMode = channel.getTempMode().toString(); + writer.print(", \"temp_mode\": \"" + tempMode + "\""); + } + } + + writer.print('}'); + inputNum++; + } + // finish predecessors + writer.print("\n\t\t]"); + } + + //--------------------------------------------------------------------------------------- + // the part below here is relevant only to plan nodes with concrete strategies, etc + //--------------------------------------------------------------------------------------- + + final PlanNode p = node.getPlanNode(); + if (p == null) { + // finish node + writer.print("\n\t}"); + return true; + } + // local strategy + String locString = null; + if (p.getDriverStrategy() != null) { + switch (p.getDriverStrategy()) { + case NONE: + case BINARY_NO_OP: + break; + + case UNARY_NO_OP: + locString = "No-Op"; + break; + + case COLLECTOR_MAP: + case MAP: + locString = "Map"; + break; + + case FLAT_MAP: + locString = "FlatMap"; + break; + + case MAP_PARTITION: + locString = "Map Partition"; + break; + + case ALL_REDUCE: + locString = "Reduce All"; + break; + + case ALL_GROUP_REDUCE: + case ALL_GROUP_REDUCE_COMBINE: + locString = "Group Reduce All"; + break; + + case SORTED_REDUCE: + locString = "Sorted Reduce"; + break; + + case SORTED_PARTIAL_REDUCE: + locString = "Sorted Combine/Reduce"; + break; + + case SORTED_GROUP_REDUCE: + locString = "Sorted Group Reduce"; + break; + + case SORTED_GROUP_COMBINE: + locString = "Sorted Combine"; + break; + + case HYBRIDHASH_BUILD_FIRST: + locString = "Hybrid Hash (build: " + child1name + ")"; + break; + case HYBRIDHASH_BUILD_SECOND: + locString = "Hybrid Hash (build: " + child2name + ")"; + break; + + case HYBRIDHASH_BUILD_FIRST_CACHED: + locString = "Hybrid Hash (CACHED) (build: " + child1name + ")"; + break; + case HYBRIDHASH_BUILD_SECOND_CACHED: + locString = "Hybrid Hash (CACHED) (build: " + child2name + ")"; + break; + + case NESTEDLOOP_BLOCKED_OUTER_FIRST: + locString = "Nested Loops (Blocked Outer: " + child1name + ")"; + break; + case NESTEDLOOP_BLOCKED_OUTER_SECOND: + locString = "Nested Loops (Blocked Outer: " + child2name + ")"; + break; + case NESTEDLOOP_STREAMED_OUTER_FIRST: + locString = "Nested Loops (Streamed Outer: " + child1name + ")"; + break; + case NESTEDLOOP_STREAMED_OUTER_SECOND: + locString = "Nested Loops (Streamed Outer: " + child2name + ")"; + break; + + case MERGE: + locString = "Merge"; + break; + + case CO_GROUP: + locString = "Co-Group"; + break; + + default: + locString = p.getDriverStrategy().name(); + break; + } + + if (locString != null) { + writer.print(",\n\t\t\"driver_strategy\": \""); + writer.print(locString); + writer.print("\""); + } + } + + { + // output node global properties + final GlobalProperties gp = p.getGlobalProperties(); + + writer.print(",\n\t\t\"global_properties\": [\n"); + + addProperty(writer, "Partitioning", gp.getPartitioning().name(), true); + if (gp.getPartitioningFields() != null) { + addProperty(writer, "Partitioned on", gp.getPartitioningFields().toString(), false); + } + if (gp.getPartitioningOrdering() != null) { + addProperty(writer, "Partitioning Order", gp.getPartitioningOrdering().toString(), false); + } + else { + addProperty(writer, "Partitioning Order", "(none)", false); + } + if (n.getUniqueFields() == null || n.getUniqueFields().size() == 0) { + addProperty(writer, "Uniqueness", "not unique", false); + } + else { + addProperty(writer, "Uniqueness", n.getUniqueFields().toString(), false); + } + + writer.print("\n\t\t]"); + } + + { + // output node local properties + LocalProperties lp = p.getLocalProperties(); + + writer.print(",\n\t\t\"local_properties\": [\n"); + + if (lp.getOrdering() != null) { + addProperty(writer, "Order", lp.getOrdering().toString(), true); + } + else { + addProperty(writer, "Order", "(none)", true); + } + if (lp.getGroupedFields() != null && lp.getGroupedFields().size() > 0) { + addProperty(writer, "Grouped on", lp.getGroupedFields().toString(), false); + } else { + addProperty(writer, "Grouping", "not grouped", false); + } + if (n.getUniqueFields() == null || n.getUniqueFields().size() == 0) { + addProperty(writer, "Uniqueness", "not unique", false); + } + else { + addProperty(writer, "Uniqueness", n.getUniqueFields().toString(), false); + } + + writer.print("\n\t\t]"); + } + + // output node size estimates + writer.print(",\n\t\t\"estimates\": [\n"); + + addProperty(writer, "Est. Output Size", n.getEstimatedOutputSize() == -1 ? "(unknown)" + : formatNumber(n.getEstimatedOutputSize(), "B"), true); + addProperty(writer, "Est. Cardinality", n.getEstimatedNumRecords() == -1 ? "(unknown)" + : formatNumber(n.getEstimatedNumRecords()), false); + + writer.print("\t\t]"); + + // output node cost + if (p.getNodeCosts() != null) { + writer.print(",\n\t\t\"costs\": [\n"); + + addProperty(writer, "Network", p.getNodeCosts().getNetworkCost() == -1 ? "(unknown)" + : formatNumber(p.getNodeCosts().getNetworkCost(), "B"), true); + addProperty(writer, "Disk I/O", p.getNodeCosts().getDiskCost() == -1 ? "(unknown)" + : formatNumber(p.getNodeCosts().getDiskCost(), "B"), false); + addProperty(writer, "CPU", p.getNodeCosts().getCpuCost() == -1 ? "(unknown)" + : formatNumber(p.getNodeCosts().getCpuCost(), ""), false); + + addProperty(writer, "Cumulative Network", + p.getCumulativeCosts().getNetworkCost() == -1 ? "(unknown)" : formatNumber(p + .getCumulativeCosts().getNetworkCost(), "B"), false); + addProperty(writer, "Cumulative Disk I/O", + p.getCumulativeCosts().getDiskCost() == -1 ? "(unknown)" : formatNumber(p + .getCumulativeCosts().getDiskCost(), "B"), false); + addProperty(writer, "Cumulative CPU", + p.getCumulativeCosts().getCpuCost() == -1 ? "(unknown)" : formatNumber(p + .getCumulativeCosts().getCpuCost(), ""), false); + + writer.print("\n\t\t]"); + } + + // output the node compiler hints + if (n.getOperator().getCompilerHints() != null) { + CompilerHints hints = n.getOperator().getCompilerHints(); + CompilerHints defaults = new CompilerHints(); + + String size = hints.getOutputSize() == defaults.getOutputSize() ? "(none)" : String.valueOf(hints.getOutputSize()); + String card = hints.getOutputCardinality() == defaults.getOutputCardinality() ? "(none)" : String.valueOf(hints.getOutputCardinality()); + String width = hints.getAvgOutputRecordSize() == defaults.getAvgOutputRecordSize() ? "(none)" : String.valueOf(hints.getAvgOutputRecordSize()); + String filter = hints.getFilterFactor() == defaults.getFilterFactor() ? "(none)" : String.valueOf(hints.getFilterFactor()); + + writer.print(",\n\t\t\"compiler_hints\": [\n"); + + addProperty(writer, "Output Size (bytes)", size, true); + addProperty(writer, "Output Cardinality", card, false); + addProperty(writer, "Avg. Output Record Size (bytes)", width, false); + addProperty(writer, "Filter Factor", filter, false); + + writer.print("\t\t]"); + } + + // finish node + writer.print("\n\t}"); + return true; + } + + private void addProperty(PrintWriter writer, String name, String value, boolean first) { + if (!first) { + writer.print(",\n"); + } + writer.print("\t\t\t{ \"name\": \""); + writer.print(name); + writer.print("\", \"value\": \""); + writer.print(value); + writer.print("\" }"); + } + + public static final String formatNumber(double number) { + return formatNumber(number, ""); + } + + public static final String formatNumber(double number, String suffix) { + if (number <= 0.0) { + return String.valueOf(number); + } + + int power = (int) Math.ceil(Math.log10(number)); + + int group = (power - 1) / 3; + if (group >= SIZE_SUFFIXES.length) { + group = SIZE_SUFFIXES.length - 1; + } else if (group < 0) { + group = 0; + } + + // truncate fractional part + int beforeDecimal = power - group * 3; + if (power > beforeDecimal) { + for (int i = power - beforeDecimal; i > 0; i--) { + number /= 10; + } + } + + return group > 0 ? String.format(Locale.US, "%.2f %s", number, SIZE_SUFFIXES[group]) : + String.format(Locale.US, "%.2f", number); + } + + private static final char[] SIZE_SUFFIXES = { 0, 'K', 'M', 'G', 'T' }; +}