http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
new file mode 100644
index 0000000..95adace
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.WorksetIterationNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * A node in the execution, representing a workset iteration (delta iteration).
+ */
+public class WorksetIterationPlanNode extends DualInputPlanNode implements 
IterationPlanNode {
+
+       private final SolutionSetPlanNode solutionSetPlanNode;
+       
+       private final WorksetPlanNode worksetPlanNode;
+       
+       private final PlanNode solutionSetDeltaPlanNode;
+       
+       private final PlanNode nextWorkSetPlanNode;
+       
+       private TypeSerializerFactory<?> worksetSerializer;
+       
+       private TypeSerializerFactory<?> solutionSetSerializer;
+       
+       private TypeComparatorFactory<?> solutionSetComparator;
+       
+       private boolean immediateSolutionSetUpdate;
+       
+       public Object postPassHelper;
+       
+       private TypeSerializerFactory<?> serializerForIterationChannel;
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       public WorksetIterationPlanNode(WorksetIterationNode template, String 
nodeName, Channel initialSolutionSet, Channel initialWorkset,
+                       SolutionSetPlanNode solutionSetPlanNode, 
WorksetPlanNode worksetPlanNode,
+                       PlanNode nextWorkSetPlanNode, PlanNode 
solutionSetDeltaPlanNode)
+       {
+               super(template, nodeName, initialSolutionSet, initialWorkset, 
DriverStrategy.BINARY_NO_OP);
+               this.solutionSetPlanNode = solutionSetPlanNode;
+               this.worksetPlanNode = worksetPlanNode;
+               this.solutionSetDeltaPlanNode = solutionSetDeltaPlanNode;
+               this.nextWorkSetPlanNode = nextWorkSetPlanNode;
+               
+               mergeBranchPlanMaps();
+
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public WorksetIterationNode getIterationNode() {
+               if (this.template instanceof WorksetIterationNode) {
+                       return (WorksetIterationNode) this.template;
+               } else {
+                       throw new RuntimeException();
+               }
+       }
+       
+       public SolutionSetPlanNode getSolutionSetPlanNode() {
+               return this.solutionSetPlanNode;
+       }
+       
+       public WorksetPlanNode getWorksetPlanNode() {
+               return this.worksetPlanNode;
+       }
+       
+       public PlanNode getSolutionSetDeltaPlanNode() {
+               return this.solutionSetDeltaPlanNode;
+       }
+       
+       public PlanNode getNextWorkSetPlanNode() {
+               return this.nextWorkSetPlanNode;
+       }
+       
+       public Channel getInitialSolutionSetInput() {
+               return getInput1();
+       }
+       
+       public Channel getInitialWorksetInput() {
+               return getInput2();
+       }
+       
+       public void setImmediateSolutionSetUpdate(boolean immediateUpdate) {
+               this.immediateSolutionSetUpdate = immediateUpdate;
+       }
+       
+       public boolean isImmediateSolutionSetUpdate() {
+               return this.immediateSolutionSetUpdate;
+       }
+       
+       public FieldList getSolutionSetKeyFields() {
+               return getIterationNode().getSolutionSetKeyFields();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public TypeSerializerFactory<?> getWorksetSerializer() {
+               return worksetSerializer;
+       }
+       
+       public void setWorksetSerializer(TypeSerializerFactory<?> 
worksetSerializer) {
+               this.worksetSerializer = worksetSerializer;
+       }
+       
+       public TypeSerializerFactory<?> getSolutionSetSerializer() {
+               return solutionSetSerializer;
+       }
+       
+       public void setSolutionSetSerializer(TypeSerializerFactory<?> 
solutionSetSerializer) {
+               this.solutionSetSerializer = solutionSetSerializer;
+       }
+       
+       public TypeComparatorFactory<?> getSolutionSetComparator() {
+               return solutionSetComparator;
+       }
+       
+       public void setSolutionSetComparator(TypeComparatorFactory<?> 
solutionSetComparator) {
+               this.solutionSetComparator = solutionSetComparator;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public void setCosts(Costs nodeCosts) {
+               // add the costs from the step function
+               
nodeCosts.addCosts(this.solutionSetDeltaPlanNode.getCumulativeCostsShare());
+               
nodeCosts.addCosts(this.nextWorkSetPlanNode.getCumulativeCostsShare());
+
+               super.setCosts(nodeCosts);
+       }
+       
+       public int getMemoryConsumerWeight() {
+               // solution set index and workset back channel
+               return 2;
+       }
+       
+       @Override
+       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+               if (source == this) {
+                       return FOUND_SOURCE;
+               }
+               
+               SourceAndDamReport fromOutside = 
super.hasDamOnPathDownTo(source);
+
+               if (fromOutside == FOUND_SOURCE_AND_DAM) {
+                       return FOUND_SOURCE_AND_DAM;
+               }
+               else if (fromOutside == FOUND_SOURCE) {
+                       // we always have a dam in the solution set index
+                       return FOUND_SOURCE_AND_DAM;
+               } else {
+                       SourceAndDamReport fromNextWorkset = 
nextWorkSetPlanNode.hasDamOnPathDownTo(source);
+
+                       if (fromNextWorkset == FOUND_SOURCE_AND_DAM){
+                               return FOUND_SOURCE_AND_DAM;
+                       } else if (fromNextWorkset == FOUND_SOURCE){
+                               return FOUND_SOURCE_AND_DAM;
+                       } else {
+                               return 
this.solutionSetDeltaPlanNode.hasDamOnPathDownTo(source);
+                       }
+               }
+       }
+
+       @Override
+       public void acceptForStepFunction(Visitor<PlanNode> visitor) {
+               this.solutionSetDeltaPlanNode.accept(visitor);
+               this.nextWorkSetPlanNode.accept(visitor);
+       }
+
+       /**
+        * Merging can only take place after the solutionSetDelta and 
nextWorkset PlanNode has been set,
+        * because they can contain also some of the branching nodes.
+        */
+       @Override
+       protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> 
branchPlan1, Map<OptimizerNode,PlanNode> branchPlan2) {}
+
+       
+       protected void mergeBranchPlanMaps() {
+               Map<OptimizerNode, PlanNode> branchPlan1 = 
input1.getSource().branchPlan;
+               Map<OptimizerNode, PlanNode> branchPlan2 = 
input2.getSource().branchPlan;
+
+               // merge the branchPlan maps according the template's 
uncloseBranchesStack
+               if (this.template.hasUnclosedBranches()) {
+                       if (this.branchPlan == null) {
+                               this.branchPlan = new HashMap<OptimizerNode, 
PlanNode>(8);
+                       }
+
+                       for (OptimizerNode.UnclosedBranchDescriptor uc : 
this.template.getOpenBranches()) {
+                               OptimizerNode brancher = uc.getBranchingNode();
+                               PlanNode selectedCandidate = null;
+
+                               if (branchPlan1 != null) {
+                                       // predecessor 1 has branching 
children, see if it got the branch we are looking for
+                                       selectedCandidate = 
branchPlan1.get(brancher);
+                               }
+
+                               if (selectedCandidate == null && branchPlan2 != 
null) {
+                                       // predecessor 2 has branching 
children, see if it got the branch we are looking for
+                                       selectedCandidate = 
branchPlan2.get(brancher);
+                               }
+
+                               if(selectedCandidate == null && 
getSolutionSetDeltaPlanNode() != null && getSolutionSetDeltaPlanNode()
+                                               .branchPlan != null){
+                                       selectedCandidate = 
getSolutionSetDeltaPlanNode().branchPlan.get(brancher);
+                               }
+
+                               if(selectedCandidate == null && 
getNextWorkSetPlanNode() != null && getNextWorkSetPlanNode()
+                                               .branchPlan != null){
+                                       selectedCandidate = 
getNextWorkSetPlanNode().branchPlan.get(brancher);
+                               }
+
+                               if (selectedCandidate == null) {
+                                       throw new CompilerException(
+                                                       "Candidates for a node 
with open branches are missing information about the selected candidate ");
+                               }
+
+                               this.branchPlan.put(brancher, 
selectedCandidate);
+                       }
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public TypeSerializerFactory<?> getSerializerForIterationChannel() {
+               return serializerForIterationChannel;
+       }
+       
+       public void setSerializerForIterationChannel(TypeSerializerFactory<?> 
serializerForIterationChannel) {
+               this.serializerForIterationChannel = 
serializerForIterationChannel;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
new file mode 100644
index 0000000..8d044d6
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.WorksetNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DamBehavior;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Plan candidate node for partial solution of a bulk iteration.
+ */
+public class WorksetPlanNode extends PlanNode {
+       
+       private static final Costs NO_COSTS = new Costs();
+       
+       private WorksetIterationPlanNode containingIterationNode;
+       
+       private final Channel initialInput;
+       
+       public Object postPassHelper;
+       
+       
+       public WorksetPlanNode(WorksetNode template, String nodeName,
+                       GlobalProperties gProps, LocalProperties lProps,
+                       Channel initialInput)
+       {
+               super(template, nodeName, DriverStrategy.NONE);
+               
+               this.globalProps = gProps;
+               this.localProps = lProps;
+               this.initialInput = initialInput;
+               
+               // the node incurs no cost
+               this.nodeCosts = NO_COSTS;
+               this.cumulativeCosts = NO_COSTS;
+               
+               if (initialInput.getSource().branchPlan != null && 
initialInput.getSource().branchPlan.size() > 0) {
+                       if (this.branchPlan == null) {
+                               this.branchPlan = new HashMap<OptimizerNode, 
PlanNode>();
+                       }
+                       
+                       
this.branchPlan.putAll(initialInput.getSource().branchPlan);
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public WorksetNode getWorksetNode() {
+               return (WorksetNode) this.template;
+       }
+       
+       public WorksetIterationPlanNode getContainingIterationNode() {
+               return this.containingIterationNode;
+       }
+       
+       public void setContainingIterationNode(WorksetIterationPlanNode 
containingIterationNode) {
+               this.containingIterationNode = containingIterationNode;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+
+       @Override
+       public void accept(Visitor<PlanNode> visitor) {
+               if (visitor.preVisit(this)) {
+                       visitor.postVisit(this);
+               }
+       }
+
+
+       @Override
+       public Iterable<PlanNode> getPredecessors() {
+               return Collections.<PlanNode>emptyList();
+       }
+
+
+       @Override
+       public Iterable<Channel> getInputs() {
+               return Collections.<Channel>emptyList();
+       }
+
+
+       @Override
+       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+               if (source == this) {
+                       return FOUND_SOURCE;
+               }
+               SourceAndDamReport res = 
this.initialInput.getSource().hasDamOnPathDownTo(source);
+               if (res == FOUND_SOURCE_AND_DAM) {
+                       return FOUND_SOURCE_AND_DAM;
+               }
+               else if (res == FOUND_SOURCE) {
+                       return (this.initialInput.getLocalStrategy().dams() || 
+                                       
this.initialInput.getTempMode().breaksPipeline() ||
+                                       getDriverStrategy().firstDam() == 
DamBehavior.FULL_DAM) ?
+                               FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
+               }
+               else {
+                       return NOT_FOUND;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
new file mode 100644
index 0000000..3f8cb46
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plandump;
+
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+
+/**
+ *
+ */
+public interface DumpableConnection<T extends DumpableNode<T>> {
+
+       public DumpableNode<T> getSource();
+       
+       public ShipStrategyType getShipStrategy();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
new file mode 100644
index 0000000..1bc0f0c
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plandump;
+
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.plan.PlanNode;
+
+/**
+ *
+ */
+public interface DumpableNode<T extends DumpableNode<T>> {
+       
+       /**
+        * Gets an iterator over the predecessors.
+        * 
+        * @return An iterator over the predecessors.
+        */
+       Iterable<T> getPredecessors();
+       
+       Iterable<DumpableConnection<T>> getDumpableInputs();
+       
+       OptimizerNode getOptimizerNode();
+       
+       PlanNode getPlanNode();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
new file mode 100644
index 0000000..6f918c0
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plandump;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.api.common.operators.CompilerHints;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.BinaryUnionNode;
+import org.apache.flink.optimizer.dag.BulkIterationNode;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.optimizer.dag.DataSourceNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.DagConnection;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.dag.WorksetIterationNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.util.Utils;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.StringUtils;
+
+/**
+ * 
+ */
+public class PlanJSONDumpGenerator {
+       
+       private Map<DumpableNode<?>, Integer> nodeIds; // resolves pact nodes 
to ids
+
+       private int nodeCnt;
+       
+       private boolean encodeForHTML;
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public void setEncodeForHTML(boolean encodeForHTML) {
+               this.encodeForHTML = encodeForHTML;
+       }
+       
+       public boolean isEncodeForHTML() {
+               return encodeForHTML;
+       }
+       
+       
+       public void dumpPactPlanAsJSON(List<DataSinkNode> nodes, PrintWriter 
writer) {
+               @SuppressWarnings("unchecked")
+               List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>) 
nodes;
+               compilePlanToJSON(n, writer);
+       }
+       
+       public String getPactPlanAsJSON(List<DataSinkNode> nodes) {
+               StringWriter sw = new StringWriter();
+               PrintWriter pw = new PrintWriter(sw);
+               dumpPactPlanAsJSON(nodes, pw);
+               return sw.toString();
+       }
+       
+       public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, File toFile) 
throws IOException {
+               PrintWriter pw = null;
+               try {
+                       pw = new PrintWriter(new FileOutputStream(toFile), 
false);
+                       dumpOptimizerPlanAsJSON(plan, pw);
+                       pw.flush();
+               } finally {
+                       if (pw != null) {
+                               pw.close();
+                       }
+               }
+       }
+       
+       public String getOptimizerPlanAsJSON(OptimizedPlan plan) {
+               StringWriter sw = new StringWriter();
+               PrintWriter pw = new PrintWriter(sw);
+               dumpOptimizerPlanAsJSON(plan, pw);
+               pw.close();
+               return sw.toString();
+       }
+       
+       public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, PrintWriter 
writer) {
+               Collection<SinkPlanNode> sinks = plan.getDataSinks();
+               if (sinks instanceof List) {
+                       dumpOptimizerPlanAsJSON((List<SinkPlanNode>) sinks, 
writer);
+               } else {
+                       List<SinkPlanNode> n = new ArrayList<SinkPlanNode>();
+                       n.addAll(sinks);
+                       dumpOptimizerPlanAsJSON(n, writer);
+               }
+       }
+       
+       public void dumpOptimizerPlanAsJSON(List<SinkPlanNode> nodes, 
PrintWriter writer) {
+               @SuppressWarnings("unchecked")
+               List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>) 
nodes;
+               compilePlanToJSON(n, writer);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       private void compilePlanToJSON(List<DumpableNode<?>> nodes, PrintWriter 
writer) {
+               // initialization to assign node ids
+               this.nodeIds = new HashMap<DumpableNode<?>, Integer>();
+               this.nodeCnt = 0;
+               
+               // JSON header
+               writer.print("{\n\t\"nodes\": [\n\n");
+
+               // Generate JSON for plan
+               for (int i = 0; i < nodes.size(); i++) {
+                       visit(nodes.get(i), writer, i == 0);
+               }
+               
+               // JSON Footer
+               writer.println("\n\t]\n}");
+       }
+
+       private boolean visit(DumpableNode<?> node, PrintWriter writer, boolean 
first) {
+               // check for duplicate traversal
+               if (this.nodeIds.containsKey(node)) {
+                       return false;
+               }
+               
+               // assign an id first
+               this.nodeIds.put(node, this.nodeCnt++);
+               
+               // then recurse
+               for (DumpableNode<?> child : node.getPredecessors()) {
+                       //This is important, because when the node was already 
in the graph it is not allowed
+                       //to set first to false!
+                       if (visit(child, writer, first)) {
+                               first = false;
+                       };
+               }
+               
+               // check if this node should be skipped from the dump
+               final OptimizerNode n = node.getOptimizerNode();
+               
+               // ------------------ dump after the ascend 
---------------------
+               // start a new node and output node id
+               if (!first) {
+                       writer.print(",\n");    
+               }
+               // open the node
+               writer.print("\t{\n");
+               
+               // recurse, it is is an iteration node
+               if (node instanceof BulkIterationNode || node instanceof 
BulkIterationPlanNode) {
+                       
+                       DumpableNode<?> innerChild = node instanceof 
BulkIterationNode ?
+                                       ((BulkIterationNode) 
node).getNextPartialSolution() :
+                                       ((BulkIterationPlanNode) 
node).getRootOfStepFunction();
+                                       
+                       DumpableNode<?> begin = node instanceof 
BulkIterationNode ?
+                               ((BulkIterationNode) node).getPartialSolution() 
:
+                               ((BulkIterationPlanNode) 
node).getPartialSolutionPlanNode();
+                       
+                       writer.print("\t\t\"step_function\": [\n");
+                       
+                       visit(innerChild, writer, true);
+                       
+                       writer.print("\n\t\t],\n");
+                       writer.print("\t\t\"partial_solution\": " + 
this.nodeIds.get(begin) + ",\n");
+                       writer.print("\t\t\"next_partial_solution\": " + 
this.nodeIds.get(innerChild) + ",\n");
+               } else if (node instanceof WorksetIterationNode || node 
instanceof WorksetIterationPlanNode) {
+                       
+                       DumpableNode<?> worksetRoot = node instanceof 
WorksetIterationNode ?
+                                       ((WorksetIterationNode) 
node).getNextWorkset() :
+                                       ((WorksetIterationPlanNode) 
node).getNextWorkSetPlanNode();
+                       DumpableNode<?> solutionDelta = node instanceof 
WorksetIterationNode ?
+                                       ((WorksetIterationNode) 
node).getSolutionSetDelta() :
+                                       ((WorksetIterationPlanNode) 
node).getSolutionSetDeltaPlanNode();
+                                       
+                       DumpableNode<?> workset = node instanceof 
WorksetIterationNode ?
+                                               ((WorksetIterationNode) 
node).getWorksetNode() :
+                                               ((WorksetIterationPlanNode) 
node).getWorksetPlanNode();
+                       DumpableNode<?> solutionSet = node instanceof 
WorksetIterationNode ?
+                                               ((WorksetIterationNode) 
node).getSolutionSetNode() :
+                                               ((WorksetIterationPlanNode) 
node).getSolutionSetPlanNode();
+                       
+                       writer.print("\t\t\"step_function\": [\n");
+                       
+                       visit(worksetRoot, writer, true);
+                       visit(solutionDelta, writer, false);
+                       
+                       writer.print("\n\t\t],\n");
+                       writer.print("\t\t\"workset\": " + 
this.nodeIds.get(workset) + ",\n");
+                       writer.print("\t\t\"solution_set\": " + 
this.nodeIds.get(solutionSet) + ",\n");
+                       writer.print("\t\t\"next_workset\": " + 
this.nodeIds.get(worksetRoot) + ",\n");
+                       writer.print("\t\t\"solution_delta\": " + 
this.nodeIds.get(solutionDelta) + ",\n");
+               }
+               
+               // print the id
+               writer.print("\t\t\"id\": " + this.nodeIds.get(node));
+
+               
+               final String type;
+               String contents;
+               if (n instanceof DataSinkNode) {
+                       type = "sink";
+                       contents = n.getOperator().toString();
+               } else if (n instanceof DataSourceNode) {
+                       type = "source";
+                       contents = n.getOperator().toString();
+               }
+               else if (n instanceof BulkIterationNode) {
+                       type = "bulk_iteration";
+                       contents = n.getOperator().getName();
+               }
+               else if (n instanceof WorksetIterationNode) {
+                       type = "workset_iteration";
+                       contents = n.getOperator().getName();
+               }
+               else if (n instanceof BinaryUnionNode) {
+                       type = "pact";
+                       contents = "";
+               }
+               else {
+                       type = "pact";
+                       contents = n.getOperator().getName();
+               }
+               
+               contents = StringUtils.showControlCharacters(contents);
+               if (encodeForHTML) {
+                       contents = StringEscapeUtils.escapeHtml4(contents);
+                       contents = contents.replace("\\", "&#92;");
+               }
+               
+               
+               String name = n.getName();
+               if (name.equals("Reduce") && (node instanceof 
SingleInputPlanNode) && 
+                               ((SingleInputPlanNode) 
node).getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE) {
+                       name = "Combine";
+               }
+               
+               // output the type identifier
+               writer.print(",\n\t\t\"type\": \"" + type + "\"");
+               
+               // output node name
+               writer.print(",\n\t\t\"pact\": \"" + name + "\"");
+               
+               // output node contents
+               writer.print(",\n\t\t\"contents\": \"" + contents + "\"");
+
+               // degree of parallelism
+               writer.print(",\n\t\t\"parallelism\": \""
+                       + (n.getParallelism() >= 1 ? n.getParallelism() : 
"default") + "\"");
+               
+               // output node predecessors
+               Iterator<? extends DumpableConnection<?>> inConns = 
node.getDumpableInputs().iterator();
+               String child1name = "", child2name = "";
+
+               if (inConns != null && inConns.hasNext()) {
+                       // start predecessor list
+                       writer.print(",\n\t\t\"predecessors\": [");
+                       int inputNum = 0;
+                       
+                       while (inConns.hasNext()) {
+                               final DumpableConnection<?> inConn = 
inConns.next();
+                               final DumpableNode<?> source = 
inConn.getSource();
+                               writer.print(inputNum == 0 ? "\n" : ",\n");
+                               if (inputNum == 0) {
+                                       child1name += child1name.length() > 0 ? 
", " : ""; 
+                                       child1name += 
source.getOptimizerNode().getOperator().getName();
+                               } else if (inputNum == 1) {
+                                       child2name += child2name.length() > 0 ? 
", " : ""; 
+                                       child2name = 
source.getOptimizerNode().getOperator().getName();
+                               }
+
+                               // output predecessor id
+                               writer.print("\t\t\t{\"id\": " + 
this.nodeIds.get(source));
+
+                               // output connection side
+                               if (inConns.hasNext() || inputNum > 0) {
+                                       writer.print(", \"side\": \"" + 
(inputNum == 0 ? "first" : "second") + "\"");
+                               }
+                               // output shipping strategy and channel type
+                               final Channel channel = (inConn instanceof 
Channel) ? (Channel) inConn : null; 
+                               final ShipStrategyType shipType = channel != 
null ? channel.getShipStrategy() :
+                                               ((DagConnection) 
inConn).getShipStrategy();
+                                       
+                               String shipStrategy = null;
+                               if (shipType != null) {
+                                       switch (shipType) {
+                                       case NONE:
+                                               // nothing
+                                               break;
+                                       case FORWARD:
+                                               shipStrategy = "Forward";
+                                               break;
+                                       case BROADCAST:
+                                               shipStrategy = "Broadcast";
+                                               break;
+                                       case PARTITION_HASH:
+                                               shipStrategy = "Hash Partition";
+                                               break;
+                                       case PARTITION_RANGE:
+                                               shipStrategy = "Range 
Partition";
+                                               break;
+                                       case PARTITION_RANDOM:
+                                               shipStrategy = "Redistribute";
+                                               break;
+                                       case PARTITION_FORCED_REBALANCE:
+                                               shipStrategy = "Rebalance";
+                                               break;
+                                       case PARTITION_CUSTOM:
+                                               shipStrategy = "Custom 
Partition";
+                                               break;
+                                       default:
+                                               throw new 
CompilerException("Unknown ship strategy '" + inConn.getShipStrategy().name()
+                                                       + "' in JSON 
generator.");
+                                       }
+                               }
+                               
+                               if (channel != null && 
channel.getShipStrategyKeys() != null && channel.getShipStrategyKeys().size() > 
0) {
+                                       shipStrategy += " on " + 
(channel.getShipStrategySortOrder() == null ?
+                                                       
channel.getShipStrategyKeys().toString() :
+                                                       
Utils.createOrdering(channel.getShipStrategyKeys(), 
channel.getShipStrategySortOrder()).toString());
+                               }
+
+                               if (shipStrategy != null) {
+                                       writer.print(", \"ship_strategy\": \"" 
+ shipStrategy + "\"");
+                               }
+                               
+                               if (channel != null) {
+                                       String localStrategy = null;
+                                       switch (channel.getLocalStrategy()) {
+                                       case NONE:
+                                               break;
+                                       case SORT:
+                                               localStrategy = "Sort";
+                                               break;
+                                       case COMBININGSORT:
+                                               localStrategy = "Sort 
(combining)";
+                                               break;
+                                       default:
+                                               throw new 
CompilerException("Unknown local strategy " + 
channel.getLocalStrategy().name());
+                                       }
+                                       
+                                       if (channel != null && 
channel.getLocalStrategyKeys() != null && channel.getLocalStrategyKeys().size() 
> 0) {
+                                               localStrategy += " on " + 
(channel.getLocalStrategySortOrder() == null ?
+                                                               
channel.getLocalStrategyKeys().toString() :
+                                                               
Utils.createOrdering(channel.getLocalStrategyKeys(), 
channel.getLocalStrategySortOrder()).toString());
+                                       }
+                                       
+                                       if (localStrategy != null) {
+                                               writer.print(", 
\"local_strategy\": \"" + localStrategy + "\"");
+                                       }
+                                       
+                                       if (channel != null && 
channel.getTempMode() != TempMode.NONE) {
+                                               String tempMode = 
channel.getTempMode().toString();
+                                               writer.print(", \"temp_mode\": 
\"" + tempMode + "\"");
+                                       }
+                               }
+                               
+                               writer.print('}');
+                               inputNum++;
+                       }
+                       // finish predecessors
+                       writer.print("\n\t\t]");
+               }
+               
+               
//---------------------------------------------------------------------------------------
+               // the part below here is relevant only to plan nodes with 
concrete strategies, etc
+               
//---------------------------------------------------------------------------------------
+
+               final PlanNode p = node.getPlanNode();
+               if (p == null) {
+                       // finish node
+                       writer.print("\n\t}");
+                       return true;
+               }
+               // local strategy
+               String locString = null;
+               if (p.getDriverStrategy() != null) {
+                       switch (p.getDriverStrategy()) {
+                       case NONE:
+                       case BINARY_NO_OP:
+                               break;
+                               
+                       case UNARY_NO_OP:
+                               locString = "No-Op";
+                               break;
+                               
+                       case COLLECTOR_MAP:
+                       case MAP:
+                               locString = "Map";
+                               break;
+                               
+                       case FLAT_MAP:
+                               locString = "FlatMap";
+                               break;
+                               
+                       case MAP_PARTITION:
+                               locString = "Map Partition";
+                               break;
+                       
+                       case ALL_REDUCE:
+                               locString = "Reduce All";
+                               break;
+                       
+                       case ALL_GROUP_REDUCE:
+                       case ALL_GROUP_REDUCE_COMBINE:
+                               locString = "Group Reduce All";
+                               break;
+                               
+                       case SORTED_REDUCE:
+                               locString = "Sorted Reduce";
+                               break;
+                               
+                       case SORTED_PARTIAL_REDUCE:
+                               locString = "Sorted Combine/Reduce";
+                               break;
+
+                       case SORTED_GROUP_REDUCE:
+                               locString = "Sorted Group Reduce";
+                               break;
+                               
+                       case SORTED_GROUP_COMBINE:
+                               locString = "Sorted Combine";
+                               break;
+
+                       case HYBRIDHASH_BUILD_FIRST:
+                               locString = "Hybrid Hash (build: " + child1name 
+ ")";
+                               break;
+                       case HYBRIDHASH_BUILD_SECOND:
+                               locString = "Hybrid Hash (build: " + child2name 
+ ")";
+                               break;
+                               
+                       case HYBRIDHASH_BUILD_FIRST_CACHED:
+                               locString = "Hybrid Hash (CACHED) (build: " + 
child1name + ")";
+                               break;
+                       case HYBRIDHASH_BUILD_SECOND_CACHED:
+                               locString = "Hybrid Hash (CACHED) (build: " + 
child2name + ")";
+                               break;
+
+                       case NESTEDLOOP_BLOCKED_OUTER_FIRST:
+                               locString = "Nested Loops (Blocked Outer: " + 
child1name + ")";
+                               break;
+                       case NESTEDLOOP_BLOCKED_OUTER_SECOND:
+                               locString = "Nested Loops (Blocked Outer: " + 
child2name + ")";
+                               break;
+                       case NESTEDLOOP_STREAMED_OUTER_FIRST:
+                               locString = "Nested Loops (Streamed Outer: " + 
child1name + ")";
+                               break;
+                       case NESTEDLOOP_STREAMED_OUTER_SECOND:
+                               locString = "Nested Loops (Streamed Outer: " + 
child2name + ")";
+                               break;
+
+                       case MERGE:
+                               locString = "Merge";
+                               break;
+
+                       case CO_GROUP:
+                               locString = "Co-Group";
+                               break;
+
+                       default:
+                               locString = p.getDriverStrategy().name();
+                               break;
+                       }
+
+                       if (locString != null) {
+                               writer.print(",\n\t\t\"driver_strategy\": \"");
+                               writer.print(locString);
+                               writer.print("\"");
+                       }
+               }
+               
+               {
+                       // output node global properties
+                       final GlobalProperties gp = p.getGlobalProperties();
+
+                       writer.print(",\n\t\t\"global_properties\": [\n");
+
+                       addProperty(writer, "Partitioning", 
gp.getPartitioning().name(), true);
+                       if (gp.getPartitioningFields() != null) {
+                               addProperty(writer, "Partitioned on", 
gp.getPartitioningFields().toString(), false);
+                       }
+                       if (gp.getPartitioningOrdering() != null) {
+                               addProperty(writer, "Partitioning Order", 
gp.getPartitioningOrdering().toString(), false);      
+                       }
+                       else {
+                               addProperty(writer, "Partitioning Order", 
"(none)", false);
+                       }
+                       if (n.getUniqueFields() == null || 
n.getUniqueFields().size() == 0) {
+                               addProperty(writer, "Uniqueness", "not unique", 
false);
+                       }
+                       else {
+                               addProperty(writer, "Uniqueness", 
n.getUniqueFields().toString(), false);       
+                       }
+
+                       writer.print("\n\t\t]");
+               }
+
+               {
+                       // output node local properties
+                       LocalProperties lp = p.getLocalProperties();
+
+                       writer.print(",\n\t\t\"local_properties\": [\n");
+
+                       if (lp.getOrdering() != null) {
+                               addProperty(writer, "Order", 
lp.getOrdering().toString(), true);        
+                       }
+                       else {
+                               addProperty(writer, "Order", "(none)", true);
+                       }
+                       if (lp.getGroupedFields() != null && 
lp.getGroupedFields().size() > 0) {
+                               addProperty(writer, "Grouped on", 
lp.getGroupedFields().toString(), false);
+                       } else {
+                               addProperty(writer, "Grouping", "not grouped", 
false);  
+                       }
+                       if (n.getUniqueFields() == null || 
n.getUniqueFields().size() == 0) {
+                               addProperty(writer, "Uniqueness", "not unique", 
false);
+                       }
+                       else {
+                               addProperty(writer, "Uniqueness", 
n.getUniqueFields().toString(), false);       
+                       }
+
+                       writer.print("\n\t\t]");
+               }
+
+               // output node size estimates
+               writer.print(",\n\t\t\"estimates\": [\n");
+
+               addProperty(writer, "Est. Output Size", 
n.getEstimatedOutputSize() == -1 ? "(unknown)"
+                       : formatNumber(n.getEstimatedOutputSize(), "B"), true);
+               addProperty(writer, "Est. Cardinality", 
n.getEstimatedNumRecords() == -1 ? "(unknown)"
+                       : formatNumber(n.getEstimatedNumRecords()), false);
+
+               writer.print("\t\t]");
+
+               // output node cost
+               if (p.getNodeCosts() != null) {
+                       writer.print(",\n\t\t\"costs\": [\n");
+
+                       addProperty(writer, "Network", 
p.getNodeCosts().getNetworkCost() == -1 ? "(unknown)"
+                               : 
formatNumber(p.getNodeCosts().getNetworkCost(), "B"), true);
+                       addProperty(writer, "Disk I/O", 
p.getNodeCosts().getDiskCost() == -1 ? "(unknown)"
+                               : formatNumber(p.getNodeCosts().getDiskCost(), 
"B"), false);
+                       addProperty(writer, "CPU", 
p.getNodeCosts().getCpuCost() == -1 ? "(unknown)"
+                               : formatNumber(p.getNodeCosts().getCpuCost(), 
""), false);
+
+                       addProperty(writer, "Cumulative Network",
+                               p.getCumulativeCosts().getNetworkCost() == -1 ? 
"(unknown)" : formatNumber(p
+                                       .getCumulativeCosts().getNetworkCost(), 
"B"), false);
+                       addProperty(writer, "Cumulative Disk I/O",
+                               p.getCumulativeCosts().getDiskCost() == -1 ? 
"(unknown)" : formatNumber(p
+                                       .getCumulativeCosts().getDiskCost(), 
"B"), false);
+                       addProperty(writer, "Cumulative CPU",
+                               p.getCumulativeCosts().getCpuCost() == -1 ? 
"(unknown)" : formatNumber(p
+                                       .getCumulativeCosts().getCpuCost(), 
""), false);
+
+                       writer.print("\n\t\t]");
+               }
+
+               // output the node compiler hints
+               if (n.getOperator().getCompilerHints() != null) {
+                       CompilerHints hints = 
n.getOperator().getCompilerHints();
+                       CompilerHints defaults = new CompilerHints();
+
+                       String size = hints.getOutputSize() == 
defaults.getOutputSize() ? "(none)" : String.valueOf(hints.getOutputSize());
+                       String card = hints.getOutputCardinality() == 
defaults.getOutputCardinality() ? "(none)" : 
String.valueOf(hints.getOutputCardinality());
+                       String width = hints.getAvgOutputRecordSize() == 
defaults.getAvgOutputRecordSize() ? "(none)" : 
String.valueOf(hints.getAvgOutputRecordSize());
+                       String filter = hints.getFilterFactor() == 
defaults.getFilterFactor() ? "(none)" : String.valueOf(hints.getFilterFactor());
+                       
+                       writer.print(",\n\t\t\"compiler_hints\": [\n");
+
+                       addProperty(writer, "Output Size (bytes)", size, true);
+                       addProperty(writer, "Output Cardinality", card, false);
+                       addProperty(writer, "Avg. Output Record Size (bytes)", 
width, false);
+                       addProperty(writer, "Filter Factor", filter, false);
+
+                       writer.print("\t\t]");
+               }
+
+               // finish node
+               writer.print("\n\t}");
+               return true;
+       }
+
+       private void addProperty(PrintWriter writer, String name, String value, 
boolean first) {
+               if (!first) {
+                       writer.print(",\n");
+               }
+               writer.print("\t\t\t{ \"name\": \"");
+               writer.print(name);
+               writer.print("\", \"value\": \"");
+               writer.print(value);
+               writer.print("\" }");
+       }
+
+       public static final String formatNumber(double number) {
+               return formatNumber(number, "");
+       }
+
+       public static final String formatNumber(double number, String suffix) {
+               if (number <= 0.0) {
+                       return String.valueOf(number);
+               }
+
+               int power = (int) Math.ceil(Math.log10(number));
+
+               int group = (power - 1) / 3;
+               if (group >= SIZE_SUFFIXES.length) {
+                       group = SIZE_SUFFIXES.length - 1;
+               } else if (group < 0) {
+                       group = 0;
+               }
+
+               // truncate fractional part
+               int beforeDecimal = power - group * 3;
+               if (power > beforeDecimal) {
+                       for (int i = power - beforeDecimal; i > 0; i--) {
+                               number /= 10;
+                       }
+               }
+               
+               return group > 0 ? String.format(Locale.US, "%.2f %s", number, 
SIZE_SUFFIXES[group]) :
+                       String.format(Locale.US, "%.2f", number);
+       }
+
+       private static final char[] SIZE_SUFFIXES = { 0, 'K', 'M', 'G', 'T' };
+}

Reply via email to