http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
deleted file mode 100644
index c2e81a8..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.util.Visitor;
-
-/**
- * Visitor that computes the interesting properties for each node in the 
optimizer DAG. On its recursive
- * depth-first descend, it propagates all interesting properties top-down.
- */
-public class InterestingPropertyVisitor implements Visitor<OptimizerNode> {
-
-       private CostEstimator estimator; // the cost estimator for maximal 
costs of an interesting property
-
-       /**
-        * Creates a new visitor that computes the interesting properties for 
all nodes in the plan.
-        * It uses the given cost estimator used to compute the maximal costs 
for an interesting property.
-        *
-        * @param estimator
-        *        The cost estimator to estimate the maximal costs for 
interesting properties.
-        */
-       public InterestingPropertyVisitor(CostEstimator estimator) {
-               this.estimator = estimator;
-       }
-
-       @Override
-       public boolean preVisit(OptimizerNode node) {
-               // The interesting properties must be computed on the descend. 
In case a node has multiple outputs,
-               // that computation must happen during the last descend.
-
-               if (node.getInterestingProperties() == null && 
node.haveAllOutputConnectionInterestingProperties()) {
-                       
node.computeUnionOfInterestingPropertiesFromSuccessors();
-                       
node.computeInterestingPropertiesForInputs(this.estimator);
-                       return true;
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public void postVisit(OptimizerNode visitable) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
deleted file mode 100644
index 58aa3c1..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.IterationPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plan.WorksetPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * This visitor traverses the selected execution plan and finalizes it:
- *
- * <ul>
- *     <li>The graph of nodes is double-linked (links from child to parent are 
inserted).</li>
- *     <li>If unions join static and dynamic paths, the cache is marked as a 
memory consumer.</li>
- *     <li>Relative memory fractions are assigned to all nodes.</li>
- *     <li>All nodes are collected into a set.</li>
- * </ul>
- */
-public class PlanFinalizer implements Visitor<PlanNode> {
-
-       private final Set<PlanNode> allNodes; // a set of all nodes in the 
optimizer plan
-
-       private final List<SourcePlanNode> sources; // all data source nodes in 
the optimizer plan
-
-       private final List<SinkPlanNode> sinks; // all data sink nodes in the 
optimizer plan
-
-       private final Deque<IterationPlanNode> stackOfIterationNodes;
-
-       private int memoryConsumerWeights; // a counter of all memory consumers
-
-       /**
-        * Creates a new plan finalizer.
-        */
-       public PlanFinalizer() {
-               this.allNodes = new HashSet<PlanNode>();
-               this.sources = new ArrayList<SourcePlanNode>();
-               this.sinks = new ArrayList<SinkPlanNode>();
-               this.stackOfIterationNodes = new 
ArrayDeque<IterationPlanNode>();
-       }
-
-       public OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String 
jobName, Plan originalPlan) {
-               this.memoryConsumerWeights = 0;
-
-               // traverse the graph
-               for (SinkPlanNode node : sinks) {
-                       node.accept(this);
-               }
-
-               // assign the memory to each node
-               if (this.memoryConsumerWeights > 0) {
-                       for (PlanNode node : this.allNodes) {
-                               // assign memory to the driver strategy of the 
node
-                               final int consumerWeight = 
node.getMemoryConsumerWeight();
-                               if (consumerWeight > 0) {
-                                       final double relativeMem = 
(double)consumerWeight / this.memoryConsumerWeights;
-                                       
node.setRelativeMemoryPerSubtask(relativeMem);
-                                       if (Optimizer.LOG.isDebugEnabled()) {
-                                               Optimizer.LOG.debug("Assigned " 
+ relativeMem + " of total memory to each subtask of " +
-                                                       
node.getProgramOperator().getName() + ".");
-                                       }
-                               }
-
-                               // assign memory to the local and global 
strategies of the channels
-                               for (Channel c : node.getInputs()) {
-                                       if (c.getLocalStrategy().dams()) {
-                                               final double relativeMem = 1.0 
/ this.memoryConsumerWeights;
-                                               
c.setRelativeMemoryLocalStrategy(relativeMem);
-                                               if 
(Optimizer.LOG.isDebugEnabled()) {
-                                                       
Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each local 
strategy " +
-                                                                       
"instance of " + c + ".");
-                                               }
-                                       }
-                                       if (c.getTempMode() != TempMode.NONE) {
-                                               final double relativeMem = 1.0/ 
this.memoryConsumerWeights;
-                                               
c.setRelativeTempMemory(relativeMem);
-                                               if 
(Optimizer.LOG.isDebugEnabled()) {
-                                                       
Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each 
instance of the temp " +
-                                                                       "table 
for " + c + ".");
-                                               }
-                                       }
-                               }
-                       }
-               }
-               return new OptimizedPlan(this.sources, this.sinks, 
this.allNodes, jobName, originalPlan);
-       }
-
-       @Override
-       public boolean preVisit(PlanNode visitable) {
-               // if we come here again, prevent a further descend
-               if (!this.allNodes.add(visitable)) {
-                       return false;
-               }
-
-               if (visitable instanceof SinkPlanNode) {
-                       this.sinks.add((SinkPlanNode) visitable);
-               }
-               else if (visitable instanceof SourcePlanNode) {
-                       this.sources.add((SourcePlanNode) visitable);
-               }
-               else if (visitable instanceof BinaryUnionPlanNode) {
-                       BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) 
visitable;
-                       if (unionNode.unionsStaticAndDynamicPath()) {
-                               
unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED);
-                       }
-               }
-               else if (visitable instanceof BulkPartialSolutionPlanNode) {
-                       // tell the partial solution about the iteration node 
that contains it
-                       final BulkPartialSolutionPlanNode pspn = 
(BulkPartialSolutionPlanNode) visitable;
-                       final IterationPlanNode iteration = 
this.stackOfIterationNodes.peekLast();
-
-                       // sanity check!
-                       if (iteration == null || !(iteration instanceof 
BulkIterationPlanNode)) {
-                               throw new CompilerException("Bug: Error 
finalizing the plan. " +
-                                               "Cannot associate the node for 
a partial solutions with its containing iteration.");
-                       }
-                       pspn.setContainingIterationNode((BulkIterationPlanNode) 
iteration);
-               }
-               else if (visitable instanceof WorksetPlanNode) {
-                       // tell the partial solution about the iteration node 
that contains it
-                       final WorksetPlanNode wspn = (WorksetPlanNode) 
visitable;
-                       final IterationPlanNode iteration = 
this.stackOfIterationNodes.peekLast();
-
-                       // sanity check!
-                       if (iteration == null || !(iteration instanceof 
WorksetIterationPlanNode)) {
-                               throw new CompilerException("Bug: Error 
finalizing the plan. " +
-                                               "Cannot associate the node for 
a partial solutions with its containing iteration.");
-                       }
-                       
wspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
-               }
-               else if (visitable instanceof SolutionSetPlanNode) {
-                       // tell the partial solution about the iteration node 
that contains it
-                       final SolutionSetPlanNode sspn = (SolutionSetPlanNode) 
visitable;
-                       final IterationPlanNode iteration = 
this.stackOfIterationNodes.peekLast();
-
-                       // sanity check!
-                       if (iteration == null || !(iteration instanceof 
WorksetIterationPlanNode)) {
-                               throw new CompilerException("Bug: Error 
finalizing the plan. " +
-                                               "Cannot associate the node for 
a partial solutions with its containing iteration.");
-                       }
-                       
sspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
-               }
-
-               // double-connect the connections. previously, only parents 
knew their children, because
-               // one child candidate could have been referenced by multiple 
parents.
-               for (Channel conn : visitable.getInputs()) {
-                       conn.setTarget(visitable);
-                       conn.getSource().addOutgoingChannel(conn);
-               }
-
-               for (Channel c : visitable.getBroadcastInputs()) {
-                       c.setTarget(visitable);
-                       c.getSource().addOutgoingChannel(c);
-               }
-
-               // count the memory consumption
-               this.memoryConsumerWeights += 
visitable.getMemoryConsumerWeight();
-               for (Channel c : visitable.getInputs()) {
-                       if (c.getLocalStrategy().dams()) {
-                               this.memoryConsumerWeights++;
-                       }
-                       if (c.getTempMode() != TempMode.NONE) {
-                               this.memoryConsumerWeights++;
-                       }
-               }
-               for (Channel c : visitable.getBroadcastInputs()) {
-                       if (c.getLocalStrategy().dams()) {
-                               this.memoryConsumerWeights++;
-                       }
-                       if (c.getTempMode() != TempMode.NONE) {
-                               this.memoryConsumerWeights++;
-                       }
-               }
-
-               // pass the visitor to the iteraton's step function
-               if (visitable instanceof IterationPlanNode) {
-                       // push the iteration node onto the stack
-                       final IterationPlanNode iterNode = (IterationPlanNode) 
visitable;
-                       this.stackOfIterationNodes.addLast(iterNode);
-
-                       // recurse
-                       ((IterationPlanNode) 
visitable).acceptForStepFunction(this);
-
-                       // pop the iteration node from the stack
-                       this.stackOfIterationNodes.removeLast();
-               }
-               return true;
-       }
-
-       @Override
-       public void postVisit(PlanNode visitable) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
deleted file mode 100644
index c0dc4dd..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.IterationNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.util.Visitor;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * A traversal that goes over the program data flow of an iteration and makes 
the nodes
- * that depend on the partial solution (the data set recomputed in each 
iteration) as "dynamic"
- * and the other nodes as "static".
- */
-public class StaticDynamicPathIdentifier implements Visitor<OptimizerNode> {
-
-       private final Set<OptimizerNode> seenBefore = new 
HashSet<OptimizerNode>();
-
-       private final int costWeight;
-
-       public StaticDynamicPathIdentifier(int costWeight) {
-               this.costWeight = costWeight;
-       }
-
-       @Override
-       public boolean preVisit(OptimizerNode visitable) {
-               return this.seenBefore.add(visitable);
-       }
-
-       @Override
-       public void postVisit(OptimizerNode visitable) {
-               visitable.identifyDynamicPath(this.costWeight);
-
-               // check that there is no nested iteration on the dynamic path
-               if (visitable.isOnDynamicPath() && visitable instanceof 
IterationNode) {
-                       throw new CompilerException("Nested iterations are 
currently not supported.");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
deleted file mode 100644
index d359490..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.util.Visitor;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * A traversal that checks if the Workset of a delta iteration is used in the 
data flow
- * of its step function.
- */
-public class StepFunctionValidator implements Visitor<Operator<?>> {
-
-       private final Set<Operator<?>> seenBefore = new HashSet<Operator<?>>();
-
-       private boolean foundWorkset;
-
-       @Override
-       public boolean preVisit(Operator<?> visitable) {
-               if (visitable instanceof DeltaIterationBase.WorksetPlaceHolder) 
{
-                       foundWorkset = true;
-               }
-
-               return (!foundWorkset) && seenBefore.add(visitable);
-       }
-
-       @Override
-       public void postVisit(Operator<?> visitable) {}
-
-       public boolean hasFoundWorkset() {
-               return foundWorkset;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
deleted file mode 100644
index cd8766c..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains the various traversals over the program plan and the
- * optimizer DAG (directed acyclic graph) that are made in the course of
- * the optimization.
- *
- * The traversals are mostly implemented as a {@link 
org.apache.flink.util.Visitor} that
- * traversed the program flow.
- */
-package org.apache.flink.optimizer.traversals;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
deleted file mode 100644
index 5110849..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.NoOpFunction;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.DualInputOperator;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Key;
-
-
-public class NoOpBinaryUdfOp<OUT> extends DualInputOperator<OUT, OUT, OUT, 
NoOpFunction> implements RecordOperator {
-
-       public NoOpBinaryUdfOp(TypeInformation<OUT> type) {
-               super(new 
UserCodeClassWrapper<NoOpFunction>(NoOpFunction.class), new 
BinaryOperatorInformation<OUT, OUT, OUT>(type, type, type), "NoContract");
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public Class<? extends Key<?>>[] getKeyClasses() {
-               return (Class<? extends Key<?>>[]) new Class[0];
-       }
-
-       @Override
-       protected List<OUT> executeOnCollections(List<OUT> inputData1, 
List<OUT> inputData2, RuntimeContext runtimeContext, ExecutionConfig 
executionConfig) {
-               throw new UnsupportedOperationException();
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
deleted file mode 100644
index cc4a4d6..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.NoOpFunction;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Key;
-
-
-public class NoOpUnaryUdfOp<OUT> extends SingleInputOperator<OUT, OUT, 
NoOpFunction> implements RecordOperator {
-       
-       @SuppressWarnings("rawtypes")
-       public static final NoOpUnaryUdfOp INSTANCE = new NoOpUnaryUdfOp();
-       
-       private NoOpUnaryUdfOp() {
-               // pass null here because we override getOutputType to return 
type
-               // of input operator
-               super(new 
UserCodeClassWrapper<NoOpFunction>(NoOpFunction.class), null, "");
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public Class<? extends Key<?>>[] getKeyClasses() {
-               return (Class<? extends Key<?>>[]) new Class[0];
-       }
-
-       @Override
-       public UnaryOperatorInformation<OUT, OUT> getOperatorInfo() {
-               TypeInformation<OUT> previousOut = 
input.getOperatorInfo().getOutputType();
-               return new UnaryOperatorInformation<OUT, OUT>(previousOut, 
previousOut);
-       }
-
-       @Override
-       protected List<OUT> executeOnCollections(List<OUT> inputData, 
RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
-               return inputData;
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java
deleted file mode 100644
index d8f33a2..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.util;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.CompilerException;
-
-
-/**
- * 
- */
-public class Utils
-{
-       public static final FieldList createOrderedFromSet(FieldSet set) {
-               if (set instanceof FieldList) {
-                       return (FieldList) set;
-               } else {
-                       final int[] cols = set.toArray();
-                       Arrays.sort(cols);
-                       return new FieldList(cols);
-               }
-       }
-       
-       public static final Ordering createOrdering(FieldList fields, boolean[] 
directions) {
-               final Ordering o = new Ordering();
-               for (int i = 0; i < fields.size(); i++) {
-                       o.appendOrdering(fields.get(i), null, directions == 
null || directions[i] ? Order.ASCENDING : Order.DESCENDING);
-               }
-               return o;
-       }
-       
-       public static final Ordering createOrdering(FieldList fields) {
-               final Ordering o = new Ordering();
-               for (int i = 0; i < fields.size(); i++) {
-                       o.appendOrdering(fields.get(i), null, Order.ANY);
-               }
-               return o;
-       }
-       
-       public static boolean[] getDirections(Ordering o, int numFields) {
-               final boolean[] dirs = o.getFieldSortDirections();
-               if (dirs.length == numFields) {
-                       return dirs;
-               } else if (dirs.length > numFields) {
-                       final boolean[] subSet = new boolean[numFields];
-                       System.arraycopy(dirs, 0, subSet, 0, numFields);
-                       return subSet;
-               } else {
-                       throw new CompilerException();
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * No instantiation.
-        */
-       private Utils() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
deleted file mode 100644
index 1e4bafb..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.CrossWithLargeOperator;
-import org.apache.flink.api.java.record.operators.CrossWithSmallOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.junit.Test;
-
-/**
-* Tests that validate optimizer choices when using operators that are 
requesting certain specific execution
-* strategies.
-*/
-@SuppressWarnings({"serial", "deprecation"})
-public class AdditionalOperatorsTest extends CompilerTestBase {
-
-       @Test
-       public void testCrossWithSmall() {
-               // construct the plan
-               FileDataSource source1 = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source 1");
-               FileDataSource source2 = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source 2");
-               
-               CrossOperator cross = CrossWithSmallOperator.builder(new 
DummyCrossStub())
-                               .input1(source1).input2(source2)
-                               .name("Cross").build();
-       
-               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, cross, "Sink");
-               
-               Plan plan = new Plan(sink);
-               plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-               
-               
-               try {
-                       OptimizedPlan oPlan = compileNoStats(plan);
-                       OptimizerPlanNodeResolver resolver = new 
OptimizerPlanNodeResolver(oPlan);
-                       
-                       DualInputPlanNode crossPlanNode = 
resolver.getNode("Cross");
-                       Channel in1 = crossPlanNode.getInput1();
-                       Channel in2 = crossPlanNode.getInput2();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
in1.getShipStrategy());
-                       assertEquals(ShipStrategyType.BROADCAST, 
in2.getShipStrategy());
-               } catch(CompilerException ce) {
-                       ce.printStackTrace();
-                       fail("The pact compiler is unable to compile this plan 
correctly.");
-               }
-       }
-       
-       @Test
-       public void testCrossWithLarge() {
-               // construct the plan
-               FileDataSource source1 = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source 1");
-               FileDataSource source2 = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source 2");
-               
-               CrossOperator cross= CrossWithLargeOperator.builder(new 
DummyCrossStub())
-                               .input1(source1).input2(source2)
-                               .name("Cross").build();
-       
-               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, cross, "Sink");
-               
-               Plan plan = new Plan(sink);
-               plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-               
-               
-               try {
-                       OptimizedPlan oPlan = compileNoStats(plan);
-                       OptimizerPlanNodeResolver resolver = new 
OptimizerPlanNodeResolver(oPlan);
-                       
-                       DualInputPlanNode crossPlanNode = 
resolver.getNode("Cross");
-                       Channel in1 = crossPlanNode.getInput1();
-                       Channel in2 = crossPlanNode.getInput2();
-                       
-                       assertEquals(ShipStrategyType.BROADCAST, 
in1.getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
in2.getShipStrategy());
-               } catch(CompilerException ce) {
-                       ce.printStackTrace();
-                       fail("The pact compiler is unable to compile this plan 
correctly.");
-               }
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
deleted file mode 100644
index 916aa27..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ /dev/null
@@ -1,1039 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
-import org.apache.flink.optimizer.util.DummyCoGroupStub;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-
-@SuppressWarnings({"serial", "deprecation"})
-public class BranchingPlansCompilerTest extends CompilerTestBase {
-       
-       
-       @Test
-       public void testCostComputationWithMultipleDataSinks() {
-               final int SINKS = 5;
-       
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-                       DataSet<Long> source = env.generateSequence(1, 10000);
-
-                       DataSet<Long> mappedA = source.map(new 
IdentityMapper<Long>());
-                       DataSet<Long> mappedC = source.map(new 
IdentityMapper<Long>());
-
-                       for (int sink = 0; sink < SINKS; sink++) {
-                               mappedA.output(new 
DiscardingOutputFormat<Long>());
-                               mappedC.output(new 
DiscardingOutputFormat<Long>());
-                       }
-
-                       Plan plan = env.createProgramPlan("Plans With Multiple 
Data Sinks");
-                       OptimizedPlan oPlan = compileNoStats(plan);
-
-                       new JobGraphGenerator().compileJobGraph(oPlan);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-
-       /**
-        * 
-        * <pre>
-        *                (SRC A)  
-        *                   |
-        *                (MAP A)
-        *             /         \   
-        *          (MAP B)      (MAP C)
-        *           /           /     \
-        *        (SINK A)    (SINK B)  (SINK C)
-        * </pre>
-        */
-       @SuppressWarnings("unchecked")
-       @Test
-       public void testBranchingWithMultipleDataSinks2() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-                       DataSet<Long> source = env.generateSequence(1, 10000);
-
-                       DataSet<Long> mappedA = source.map(new 
IdentityMapper<Long>());
-                       DataSet<Long> mappedB = mappedA.map(new 
IdentityMapper<Long>());
-                       DataSet<Long> mappedC = mappedA.map(new 
IdentityMapper<Long>());
-
-                       mappedB.output(new DiscardingOutputFormat<Long>());
-                       mappedC.output(new DiscardingOutputFormat<Long>());
-                       mappedC.output(new DiscardingOutputFormat<Long>());
-
-                       Plan plan = env.createProgramPlan();
-                       Set<Operator<?>> sinks = new 
HashSet<Operator<?>>(plan.getDataSinks());
-
-                       OptimizedPlan oPlan = compileNoStats(plan);
-
-                       // ---------- check the optimizer plan ----------
-
-                       // number of sinks
-                       assertEquals("Wrong number of data sinks.", 3, 
oPlan.getDataSinks().size());
-
-                       // remove matching sinks to check relation
-                       for (SinkPlanNode sink : oPlan.getDataSinks()) {
-                               
assertTrue(sinks.remove(sink.getProgramOperator()));
-                       }
-                       assertTrue(sinks.isEmpty());
-
-                       new JobGraphGenerator().compileJobGraph(oPlan);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-
-       /**
-        * <pre>
-        *                              SINK
-        *                               |
-        *                            COGROUP
-        *                        +---/    \----+
-        *                       /               \
-        *                      /             MATCH10
-        *                     /               |    \
-        *                    /                |  MATCH9
-        *                MATCH5               |  |   \
-        *                |   \                |  | MATCH8
-        *                | MATCH4             |  |  |   \
-        *                |  |   \             |  |  | MATCH7
-        *                |  | MATCH3          |  |  |  |   \
-        *                |  |  |   \          |  |  |  | MATCH6
-        *                |  |  | MATCH2       |  |  |  |  |  |
-        *                |  |  |  |   \       +--+--+--+--+--+
-        *                |  |  |  | MATCH1            MAP 
-        *                \  |  |  |  |  | /-----------/
-        *                (DATA SOURCE ONE)
-        * </pre>
-        */
-       @Test
-       public void testBranchingSourceMultipleTimes() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-                       DataSet<Tuple2<Long, Long>> source = 
env.generateSequence(1, 10000000)
-                               .map(new Duplicator<Long>());
-
-                       DataSet<Tuple2<Long, Long>> joined1 = 
source.join(source).where(0).equalTo(0)
-                                                                               
                                .with(new DummyFlatJoinFunction<Tuple2<Long, 
Long>>());
-
-                       DataSet<Tuple2<Long, Long>> joined2 = 
source.join(joined1).where(0).equalTo(0)
-                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-                       DataSet<Tuple2<Long, Long>> joined3 = 
source.join(joined2).where(0).equalTo(0)
-                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-                       DataSet<Tuple2<Long, Long>> joined4 = 
source.join(joined3).where(0).equalTo(0)
-                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-                       DataSet<Tuple2<Long, Long>> joined5 = 
source.join(joined4).where(0).equalTo(0)
-                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-                       DataSet<Tuple2<Long, Long>> mapped = source.map(
-                                       new MapFunction<Tuple2<Long, Long>, 
Tuple2<Long, Long>>() {
-                                               @Override
-                                               public Tuple2<Long, Long> 
map(Tuple2<Long, Long> value) {
-                                                       return null;
-                                               }
-                       });
-
-                       DataSet<Tuple2<Long, Long>> joined6 = 
mapped.join(mapped).where(0).equalTo(0)
-                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-                       DataSet<Tuple2<Long, Long>> joined7 = 
mapped.join(joined6).where(0).equalTo(0)
-                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-                       DataSet<Tuple2<Long, Long>> joined8 = 
mapped.join(joined7).where(0).equalTo(0)
-                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-                       DataSet<Tuple2<Long, Long>> joined9 = 
mapped.join(joined8).where(0).equalTo(0)
-                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-                       DataSet<Tuple2<Long, Long>> joined10 = 
mapped.join(joined9).where(0).equalTo(0)
-                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-
-                       joined5.coGroup(joined10)
-                                       .where(1).equalTo(1)
-                                       .with(new 
DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
-
-                               .output(new 
DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
-
-                       Plan plan = env.createProgramPlan();
-                       OptimizedPlan oPlan = compileNoStats(plan);
-                       new JobGraphGenerator().compileJobGraph(oPlan);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       /**
-        * 
-        * <pre>
-
-        *              (SINK A)
-        *                  |    (SINK B)    (SINK C)
-        *                CROSS    /          /
-        *               /     \   |  +------+
-        *              /       \  | /
-        *          REDUCE      MATCH2
-        *             |    +---/    \
-        *              \  /          |
-        *               MAP          |
-        *                |           |
-        *             COGROUP      MATCH1
-        *             /     \     /     \
-        *        (SRC A)    (SRC B)    (SRC C)
-        * </pre>
-        */
-       @Test
-       public void testBranchingWithMultipleDataSinks() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-                       DataSet<Tuple2<Long, Long>> sourceA = 
env.generateSequence(1, 10000000)
-                                       .map(new Duplicator<Long>());
-
-                       DataSet<Tuple2<Long, Long>> sourceB = 
env.generateSequence(1, 10000000)
-                                       .map(new Duplicator<Long>());
-
-                       DataSet<Tuple2<Long, Long>> sourceC = 
env.generateSequence(1, 10000000)
-                                       .map(new Duplicator<Long>());
-
-                       DataSet<Tuple2<Long, Long>> mapped = 
sourceA.coGroup(sourceB)
-                                       .where(0).equalTo(1)
-                                       .with(new CoGroupFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
-                                                       @Override
-                                                       public void 
coGroup(Iterable<Tuple2<Long, Long>> first,
-                                                                               
                        Iterable<Tuple2<Long, Long>> second,
-                                                                               
                        Collector<Tuple2<Long, Long>> out) {
-                                                         }
-                                       })
-                                       .map(new IdentityMapper<Tuple2<Long, 
Long>>());
-
-                       DataSet<Tuple2<Long, Long>> joined = 
sourceB.join(sourceC)
-                                       .where(0).equalTo(1)
-                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-                       DataSet<Tuple2<Long, Long>> joined2 = 
mapped.join(joined)
-                                       .where(1).equalTo(1)
-                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-                       DataSet<Tuple2<Long, Long>> reduced = mapped
-                                       .groupBy(1)
-                                       .reduceGroup(new 
Top1GroupReducer<Tuple2<Long, Long>>());
-
-                       reduced.cross(joined2)
-                                       .output(new 
DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
-
-                       joined2.output(new DiscardingOutputFormat<Tuple2<Long, 
Long>>());
-                       joined2.output(new DiscardingOutputFormat<Tuple2<Long, 
Long>>());
-
-                       Plan plan = env.createProgramPlan();
-                       OptimizedPlan oPlan = compileNoStats(plan);
-                       new JobGraphGenerator().compileJobGraph(oPlan);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @SuppressWarnings("unchecked")
-       @Test
-       public void testBranchEachContractType() {
-               try {
-                       // construct the plan
-                       FileDataSource sourceA = new FileDataSource(new 
DummyInputFormat(), "file:///test/file1", "Source A");
-                       FileDataSource sourceB = new FileDataSource(new 
DummyInputFormat(), "file:///test/file2", "Source B");
-                       FileDataSource sourceC = new FileDataSource(new 
DummyInputFormat(), "file:///test/file3", "Source C");
-                       
-                       MapOperator map1 = MapOperator.builder(new 
IdentityMap()).input(sourceA).name("Map 1").build();
-                       
-                       ReduceOperator reduce1 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0)
-                               .input(map1)
-                               .name("Reduce 1")
-                               .build();
-                       
-                       JoinOperator match1 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(sourceB, sourceB, sourceC)
-                               .input2(sourceC)
-                               .name("Match 1")
-                               .build();
-                       ;
-                       CoGroupOperator cogroup1 = CoGroupOperator.builder(new 
DummyCoGroupStub(), IntValue.class, 0,0)
-                               .input1(sourceA)
-                               .input2(sourceB)
-                               .name("CoGroup 1")
-                               .build();
-                       
-                       CrossOperator cross1 = CrossOperator.builder(new 
DummyCrossStub())
-                               .input1(reduce1)
-                               .input2(cogroup1)
-                               .name("Cross 1")
-                               .build();
-                       
-                       
-                       CoGroupOperator cogroup2 = CoGroupOperator.builder(new 
DummyCoGroupStub(), IntValue.class, 0,0)
-                               .input1(cross1)
-                               .input2(cross1)
-                               .name("CoGroup 2")
-                               .build();
-                       
-                       CoGroupOperator cogroup3 = CoGroupOperator.builder(new 
DummyCoGroupStub(), IntValue.class, 0,0)
-                               .input1(map1)
-                               .input2(match1)
-                               .name("CoGroup 3")
-                               .build();
-                       
-                       
-                       MapOperator map2 = MapOperator.builder(new 
IdentityMap()).input(cogroup3).name("Map 2").build();
-                       
-                       CoGroupOperator cogroup4 = CoGroupOperator.builder(new 
DummyCoGroupStub(), IntValue.class, 0,0)
-                               .input1(map2)
-                               .input2(match1)
-                               .name("CoGroup 4")
-                               .build();
-                       
-                       CoGroupOperator cogroup5 = CoGroupOperator.builder(new 
DummyCoGroupStub(), IntValue.class, 0,0)
-                               .input1(cogroup2)
-                               .input2(cogroup1)
-                               .name("CoGroup 5")
-                               .build();
-                       
-                       CoGroupOperator cogroup6 = CoGroupOperator.builder(new 
DummyCoGroupStub(), IntValue.class, 0,0)
-                               .input1(reduce1)
-                               .input2(cogroup4)
-                               .name("CoGroup 6")
-                               .build();
-                       
-                       CoGroupOperator cogroup7 = CoGroupOperator.builder(new 
DummyCoGroupStub(), IntValue.class, 0,0)
-                               .input1(cogroup5)
-                               .input2(cogroup6)
-                               .name("CoGroup 7")
-                               .build();
-                       
-                       FileDataSink sink = new FileDataSink(new 
DummyOutputFormat(), OUT_FILE, cogroup7);
-                       sink.addInput(sourceA);
-                       sink.addInput(cogroup3);
-                       sink.addInput(cogroup4);
-                       sink.addInput(cogroup1);
-                       
-                       // return the PACT plan
-                       Plan plan = new Plan(sink, "Branching of each contract 
type");
-                       
-                       OptimizedPlan oPlan = compileNoStats(plan);
-                       
-                       JobGraphGenerator jobGen = new JobGraphGenerator();
-                       
-                       //Compile plan to verify that no error is thrown
-                       jobGen.compileJobGraph(oPlan);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-       
-
-       @Test
-       public void testBranchingUnion() {
-               try {
-                       // construct the plan
-                       FileDataSource source1 = new FileDataSource(new 
DummyInputFormat(), IN_FILE);
-                       FileDataSource source2 = new FileDataSource(new 
DummyInputFormat(), IN_FILE);
-                       
-                       JoinOperator mat1 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(source1)
-                               .input2(source2)
-                               .name("Match 1")
-                               .build();
-                       
-                       MapOperator ma1 = MapOperator.builder(new 
IdentityMap()).input(mat1).name("Map1").build();
-                       
-                       ReduceOperator r1 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0)
-                               .input(ma1)
-                               .name("Reduce 1")
-                               .build();
-                       
-                       ReduceOperator r2 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0)
-                               .input(mat1)
-                               .name("Reduce 2")
-                               .build();
-                       
-                       MapOperator ma2 = MapOperator.builder(new 
IdentityMap()).input(mat1).name("Map 2").build();
-                       
-                       MapOperator ma3 = MapOperator.builder(new 
IdentityMap()).input(ma2).name("Map 3").build();
-                       
-                       @SuppressWarnings("unchecked")
-                       JoinOperator mat2 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(r1, r2, ma2, ma3)
-                               .input2(ma2)
-                               .name("Match 2")
-                               .build();
-                       mat2.setParameter(Optimizer.HINT_LOCAL_STRATEGY, 
Optimizer.HINT_LOCAL_STRATEGY_MERGE);
-                       
-                       FileDataSink sink = new FileDataSink(new 
DummyOutputFormat(), OUT_FILE, mat2);
-                       
-                       
-                       // return the PACT plan
-                       Plan plan = new Plan(sink, "Branching Union");
-                       
-                       OptimizedPlan oPlan = compileNoStats(plan);
-                       
-                       JobGraphGenerator jobGen = new JobGraphGenerator();
-                       
-                       //Compile plan to verify that no error is thrown
-                       jobGen.compileJobGraph(oPlan);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-       
-       /**
-        * 
-        * <pre>
-        *             (SRC A)     
-        *             /     \      
-        *        (SINK A)    (SINK B)
-        * </pre>
-        */
-       @Test
-       public void testBranchingWithMultipleDataSinksSmall() {
-               try {
-                       // construct the plan
-                       final String out1Path = "file:///test/1";
-                       final String out2Path = "file:///test/2";
-       
-                       FileDataSource sourceA = new 
FileDataSource(DummyInputFormat.class, IN_FILE);
-                       
-                       FileDataSink sinkA = new 
FileDataSink(DummyOutputFormat.class, out1Path, sourceA);
-                       FileDataSink sinkB = new 
FileDataSink(DummyOutputFormat.class, out2Path, sourceA);
-                       
-                       List<FileDataSink> sinks = new 
ArrayList<FileDataSink>();
-                       sinks.add(sinkA);
-                       sinks.add(sinkB);
-                       
-                       // return the PACT plan
-                       Plan plan = new Plan(sinks, "Plans With Multiple Data 
Sinks");
-                       
-                       OptimizedPlan oPlan = compileNoStats(plan);
-                       
-                       // ---------- check the optimizer plan ----------
-                       
-                       // number of sinks
-                       Assert.assertEquals("Wrong number of data sinks.", 2, 
oPlan.getDataSinks().size());
-                       
-                       // sinks contain all sink paths
-                       Set<String> allSinks = new HashSet<String>();
-                       allSinks.add(out1Path);
-                       allSinks.add(out2Path);
-                       
-                       for (SinkPlanNode n : oPlan.getDataSinks()) {
-                               String path = ((FileDataSink) 
n.getSinkNode().getOperator()).getFilePath();
-                               Assert.assertTrue("Invalid data sink.", 
allSinks.remove(path));
-                       }
-                       
-                       // ---------- compile plan to nephele job graph to 
verify that no error is thrown ----------
-                       
-                       JobGraphGenerator jobGen = new JobGraphGenerator();
-                       jobGen.compileJobGraph(oPlan);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-       
-       /**
-        * 
-        * <pre>
-        *     (SINK 3) (SINK 1)   (SINK 2) (SINK 4)
-        *         \     /             \     /
-        *         (SRC A)             (SRC B)
-        * </pre>
-        * 
-        * NOTE: this case is currently not caught by the compiler. we should 
enable the test once it is caught.
-        */
-       @Test
-       public void testBranchingDisjointPlan() {
-               // construct the plan
-               final String out1Path = "file:///test/1";
-               final String out2Path = "file:///test/2";
-               final String out3Path = "file:///test/3";
-               final String out4Path = "file:///test/4";
-
-               FileDataSource sourceA = new 
FileDataSource(DummyInputFormat.class, IN_FILE);
-               FileDataSource sourceB = new 
FileDataSource(DummyInputFormat.class, IN_FILE);
-               
-               FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, 
out1Path, sourceA, "1");
-               FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, 
out2Path, sourceB, "2");
-               FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, 
out3Path, sourceA, "3");
-               FileDataSink sink4 = new FileDataSink(DummyOutputFormat.class, 
out4Path, sourceB, "4");
-               
-               
-               List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-               sinks.add(sink1);
-               sinks.add(sink2);
-               sinks.add(sink3);
-               sinks.add(sink4);
-               
-               // return the PACT plan
-               Plan plan = new Plan(sinks, "Disjoint plan with multiple data 
sinks and branches");
-               compileNoStats(plan);
-       }
-       
-       @Test
-       public void testBranchAfterIteration() {
-               FileDataSource sourceA = new 
FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-               
-               BulkIteration iteration = new BulkIteration("Loop");
-               iteration.setInput(sourceA);
-               iteration.setMaximumNumberOfIterations(10);
-               
-               MapOperator mapper = 
MapOperator.builder(IdentityMap.class).name("Mapper").input(iteration.getPartialSolution()).build();
-               iteration.setNextPartialSolution(mapper);
-               
-               FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, 
OUT_FILE, iteration, "Sink 1");
-               
-               MapOperator postMap = 
MapOperator.builder(IdentityMap.class).name("Post Iteration Mapper")
-                               .input(iteration).build();
-               
-               FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, 
OUT_FILE, postMap, "Sink 2");
-               
-               List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-               sinks.add(sink1);
-               sinks.add(sink2);
-               
-               Plan plan = new Plan(sinks);
-               
-               try {
-                       compileNoStats(plan);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBranchBeforeIteration() {
-               FileDataSource source1 = new 
FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
-               FileDataSource source2 = new 
FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-               
-               BulkIteration iteration = new BulkIteration("Loop");
-               iteration.setInput(source2);
-               iteration.setMaximumNumberOfIterations(10);
-               
-               MapOperator inMap = MapOperator.builder(new IdentityMap())
-                                                      .input(source1)
-                                                      .name("In Iteration Map")
-                                                      
.setBroadcastVariable("BC", iteration.getPartialSolution())
-                                                      .build();
-               
-               iteration.setNextPartialSolution(inMap);
-               
-               MapOperator postMap = MapOperator.builder(new IdentityMap())
-                                                                               
 .input(source1)
-                                                                               
 .name("Post Iteration Map")
-                                                                               
 .setBroadcastVariable("BC", iteration)
-                                                                               
 .build();
-               
-               FileDataSink sink = new FileDataSink(DummyOutputFormat.class, 
OUT_FILE, postMap, "Sink");
-               
-               Plan plan = new Plan(sink);
-               
-               try {
-                       compileNoStats(plan);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-
-       /**
-        * Test to ensure that sourceA is inside as well as outside of the 
iteration the same
-        * node.
-        *
-        * <pre>
-        *       (SRC A)               (SRC B)
-        *      /       \             /       \
-        *  (SINK 1)   (ITERATION)    |     (SINK 2)
-        *             /        \     /
-        *         (SINK 3)     (CROSS => NEXT PARTIAL SOLUTION)
-        * </pre>
-        */
-       @Test
-       public void testClosure() {
-               FileDataSource sourceA = new 
FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
-               FileDataSource sourceB = new 
FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-
-               FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, 
OUT_FILE, sourceA, "Sink 1");
-               FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, 
OUT_FILE, sourceB, "Sink 2");
-
-               BulkIteration iteration = new BulkIteration("Loop");
-               iteration.setInput(sourceA);
-               iteration.setMaximumNumberOfIterations(10);
-
-               CrossOperator stepFunction = 
CrossOperator.builder(DummyCrossStub.class).name("StepFunction").
-                               input1(iteration.getPartialSolution()).
-                               input2(sourceB).
-                               build();
-
-               iteration.setNextPartialSolution(stepFunction);
-
-               FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, 
OUT_FILE, iteration, "Sink 3");
-
-               List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-               sinks.add(sink1);
-               sinks.add(sink2);
-               sinks.add(sink3);
-
-               Plan plan = new Plan(sinks);
-
-               try{
-                       compileNoStats(plan);
-               }catch(Exception e){
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-
-       /**
-        * <pre>
-        *       (SRC A)         (SRC B)          (SRC C)
-        *      /       \       /                /       \
-        *  (SINK 1) (DELTA ITERATION)          |     (SINK 2)
-        *             /    |   \               /
-        *         (SINK 3) |   (CROSS => NEXT WORKSET)
-        *                  |             |
-        *                (JOIN => SOLUTION SET DELTA)
-        * </pre>
-        */
-       @Test
-       public void testClosureDeltaIteration() {
-               FileDataSource sourceA = new 
FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
-               FileDataSource sourceB = new 
FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-               FileDataSource sourceC = new 
FileDataSource(DummyInputFormat.class, IN_FILE, "Source 3");
-
-               FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, 
OUT_FILE, sourceA, "Sink 1");
-               FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, 
OUT_FILE, sourceC, "Sink 2");
-
-               DeltaIteration iteration = new DeltaIteration(0, "Loop");
-               iteration.setInitialSolutionSet(sourceA);
-               iteration.setInitialWorkset(sourceB);
-               iteration.setMaximumNumberOfIterations(10);
-
-               CrossOperator nextWorkset = 
CrossOperator.builder(DummyCrossStub.class).name("Next workset").
-                               input1(iteration.getWorkset()).
-                               input2(sourceC).
-                               build();
-
-               JoinOperator solutionSetDelta = 
JoinOperator.builder(DummyMatchStub.class, LongValue.class,0,0).
-                               name("Next solution set.").
-                               input1(nextWorkset).
-                               input2(iteration.getSolutionSet()).
-                               build();
-
-               iteration.setNextWorkset(nextWorkset);
-               iteration.setSolutionSetDelta(solutionSetDelta);
-
-               FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, 
OUT_FILE, iteration, "Sink 3");
-
-               List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-               sinks.add(sink1);
-               sinks.add(sink2);
-               sinks.add(sink3);
-
-               Plan plan = new Plan(sinks);
-
-               try{
-                       compileNoStats(plan);
-               }catch(Exception e){
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-
-       /**
-        * <pre>
-        *                  +----Iteration-------+
-        *                  |                    |
-        *       /---------< >---------join-----< >---sink
-        *      / (Solution)|           /        |
-        *     /            |          /         |
-        *    /--map-------< >----\   /       /--|
-        *   /     (Workset)|      \ /       /   |
-        * src-map          |     join------/    |
-        *   \              |      /             |
-        *    \             +-----/--------------+
-        *     \                 /
-        *      \--reduce-------/
-        * </pre>
-        */
-       @Test
-       public void testDeltaIterationWithStaticInput() {
-               FileDataSource source = new 
FileDataSource(DummyInputFormat.class, IN_FILE, "source");
-
-               MapOperator mappedSource = 
MapOperator.builder(IdentityMap.class).
-                               input(source).
-                               name("Identity mapped source").
-                               build();
-
-               ReduceOperator reducedSource = 
ReduceOperator.builder(IdentityReduce.class).
-                               input(source).
-                               name("Identity reduce source").
-                               build();
-
-               DeltaIteration iteration = new DeltaIteration(0,"Loop");
-               iteration.setMaximumNumberOfIterations(10);
-               iteration.setInitialSolutionSet(source);
-               iteration.setInitialWorkset(mappedSource);
-
-               JoinOperator nextWorkset = 
JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,0).
-                               input1(iteration.getWorkset()).
-                               input2(reducedSource).
-                               name("Next work set").
-                               build();
-
-               JoinOperator solutionSetDelta = 
JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,
-                               0).
-                               input1(iteration.getSolutionSet()).
-                               input2(nextWorkset).
-                               name("Solution set delta").
-                               build();
-
-               iteration.setNextWorkset(nextWorkset);
-               iteration.setSolutionSetDelta(solutionSetDelta);
-
-               FileDataSink sink = new FileDataSink(DummyOutputFormat.class, 
OUT_FILE, iteration, "Iteration sink");
-               List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-               sinks.add(sink);
-
-               Plan plan = new Plan(sinks);
-
-               try{
-                       compileNoStats(plan);
-               }catch(Exception e){
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-
-       /**
-        * <pre>
-        *             +---------Iteration-------+
-        *             |                         |
-        *    /--map--< >----\                   |
-        *   /         |      \         /-------< >---sink
-        * src-map     |     join------/         |
-        *   \         |      /                  |
-        *    \        +-----/-------------------+
-        *     \            /
-        *      \--reduce--/
-        * </pre>
-        */
-       @Test
-       public void testIterationWithStaticInput() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(100);
-
-                       DataSet<Long> source = env.generateSequence(1, 1000000);
-
-                       DataSet<Long> mapped = source.map(new 
IdentityMapper<Long>());
-
-                       DataSet<Long> reduced = source.groupBy(new 
IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());
-
-                       IterativeDataSet<Long> iteration = mapped.iterate(10);
-                       iteration.closeWith(
-                                       iteration.join(reduced)
-                                                       .where(new 
IdentityKeyExtractor<Long>())
-                                                       .equalTo(new 
IdentityKeyExtractor<Long>())
-                                                       .with(new 
DummyFlatJoinFunction<Long>()))
-                                       .output(new 
DiscardingOutputFormat<Long>());
-
-                       compileNoStats(env.createProgramPlan());
-               }
-               catch(Exception e){
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBranchingBroadcastVariable() {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setDegreeOfParallelism(100);
-
-               DataSet<String> input1 = 
env.readTextFile(IN_FILE).name("source1");
-               DataSet<String> input2 = 
env.readTextFile(IN_FILE).name("source2");
-               DataSet<String> input3 = 
env.readTextFile(IN_FILE).name("source3");
-               
-               DataSet<String> result1 = input1
-                               .map(new IdentityMapper<String>())
-                               .reduceGroup(new Top1GroupReducer<String>())
-                                       .withBroadcastSet(input3, "bc");
-               
-               DataSet<String> result2 = input2
-                               .map(new IdentityMapper<String>())
-                               .reduceGroup(new Top1GroupReducer<String>())
-                                       .withBroadcastSet(input3, "bc");
-               
-               result1.join(result2)
-                               .where(new IdentityKeyExtractor<String>())
-                               .equalTo(new IdentityKeyExtractor<String>())
-                               .with(new RichJoinFunction<String, String, 
String>() {
-                                       @Override
-                                       public String join(String first, String 
second) {
-                                               return null;
-                                       }
-                               })
-                               .withBroadcastSet(input3, "bc1")
-                               .withBroadcastSet(input1, "bc2")
-                               .withBroadcastSet(result1, "bc3")
-                       .print();
-               
-               Plan plan = env.createProgramPlan();
-               
-               try{
-                       compileNoStats(plan);
-               }catch(Exception e){
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBCVariableClosure() {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               DataSet<String> input = 
env.readTextFile(IN_FILE).name("source1");
-               
-               DataSet<String> reduced = input
-                               .map(new IdentityMapper<String>())
-                               .reduceGroup(new Top1GroupReducer<String>());
-               
-               
-               DataSet<String> initialSolution = input.map(new 
IdentityMapper<String>()).withBroadcastSet(reduced, "bc");
-               
-               
-               IterativeDataSet<String> iteration = 
initialSolution.iterate(100);
-               
-               iteration.closeWith(iteration.map(new 
IdentityMapper<String>()).withBroadcastSet(reduced, "red"))
-                               .print();
-               
-               Plan plan = env.createProgramPlan();
-               
-               try{
-                       compileNoStats(plan);
-               }catch(Exception e){
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testMultipleIterations() {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setDegreeOfParallelism(100);
-               
-               DataSet<String> input = 
env.readTextFile(IN_FILE).name("source1");
-               
-               DataSet<String> reduced = input
-                               .map(new IdentityMapper<String>())
-                               .reduceGroup(new Top1GroupReducer<String>());
-                       
-               IterativeDataSet<String> iteration1 = input.iterate(100);
-               IterativeDataSet<String> iteration2 = input.iterate(20);
-               IterativeDataSet<String> iteration3 = input.iterate(17);
-               
-               iteration1.closeWith(iteration1.map(new 
IdentityMapper<String>()).withBroadcastSet(reduced, "bc1")).print();
-               iteration2.closeWith(iteration2.reduceGroup(new 
Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2")).print();
-               iteration3.closeWith(iteration3.reduceGroup(new 
IdentityGroupReducer<String>()).withBroadcastSet(reduced, "bc3")).print();
-               
-               Plan plan = env.createProgramPlan();
-               
-               try{
-                       compileNoStats(plan);
-               }catch(Exception e){
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testMultipleIterationsWithClosueBCVars() {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setDegreeOfParallelism(100);
-
-               DataSet<String> input = 
env.readTextFile(IN_FILE).name("source1");
-                       
-               IterativeDataSet<String> iteration1 = input.iterate(100);
-               IterativeDataSet<String> iteration2 = input.iterate(20);
-               IterativeDataSet<String> iteration3 = input.iterate(17);
-               
-               
-               iteration1.closeWith(iteration1.map(new 
IdentityMapper<String>())).print();
-               iteration2.closeWith(iteration2.reduceGroup(new 
Top1GroupReducer<String>())).print();
-               iteration3.closeWith(iteration3.reduceGroup(new 
IdentityGroupReducer<String>())).print();
-               
-               Plan plan = env.createProgramPlan();
-               
-               try{
-                       compileNoStats(plan);
-               }catch(Exception e){
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBranchesOnlyInBCVariables1() {
-               try{
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(100);
-
-                       DataSet<Long> input = env.generateSequence(1, 10);
-                       DataSet<Long> bc_input = env.generateSequence(1, 10);
-                       
-                       input
-                               .map(new 
IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
-                               .map(new 
IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
-                               .print();
-                       
-                       Plan plan = env.createProgramPlan();
-                       compileNoStats(plan);
-               }
-               catch(Exception e){
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBranchesOnlyInBCVariables2() {
-               try{
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(100);
-
-                       DataSet<Tuple2<Long, Long>> input = 
env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
-                       
-                       DataSet<Long> bc_input1 = env.generateSequence(1, 
10).name("BC input 1");
-                       DataSet<Long> bc_input2 = env.generateSequence(1, 
10).name("BC input 1");
-                       
-                       DataSet<Tuple2<Long, Long>> joinInput1 =
-                                       input.map(new 
IdentityMapper<Tuple2<Long,Long>>())
-                                               
.withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
-                                               .withBroadcastSet(bc_input2, 
"bc2");
-                       
-                       DataSet<Tuple2<Long, Long>> joinInput2 =
-                                       input.map(new 
IdentityMapper<Tuple2<Long,Long>>())
-                                               .withBroadcastSet(bc_input1, 
"bc1")
-                                               .withBroadcastSet(bc_input2, 
"bc2");
-                       
-                       DataSet<Tuple2<Long, Long>> joinResult = joinInput1
-                               .join(joinInput2, 
JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
-                               .with(new 
DummyFlatJoinFunction<Tuple2<Long,Long>>());
-                       
-                       input
-                               .map(new IdentityMapper<Tuple2<Long,Long>>())
-                                       .withBroadcastSet(bc_input1, "bc1")
-                               .union(joinResult)
-                               .print();
-                       
-                       Plan plan = env.createProgramPlan();
-                       compileNoStats(plan);
-               }
-               catch(Exception e){
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       private static final class Duplicator<T> implements MapFunction<T, 
Tuple2<T, T>> {
-               
-               @Override
-               public Tuple2<T, T> map(T value) {
-                       return new Tuple2<T, T>(value, value);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
deleted file mode 100644
index c7ad2da..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.Plan;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-
-@SuppressWarnings("serial")
-public class BroadcastVariablePipelinebreakerTest extends CompilerTestBase {
-
-       @Test
-       public void testNoBreakerForIndependentVariable() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<String> source1 = env.fromElements("test");
-                       DataSet<String> source2 = env.fromElements("test");
-                       
-                       source1.map(new 
IdentityMapper<String>()).withBroadcastSet(source2, "some name").print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode mapper = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       
-                       assertEquals(TempMode.NONE, 
mapper.getInput().getTempMode());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-        @Test
-       public void testBreakerForDependentVariable() {
-                       try {
-                               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<String> source1 = 
env.fromElements("test");
-                               
-                               source1.map(new 
IdentityMapper<String>()).map(new 
IdentityMapper<String>()).withBroadcastSet(source1, "some name").print();
-                               
-                               Plan p = env.createProgramPlan();
-                               OptimizedPlan op = compileNoStats(p);
-                               
-                               SinkPlanNode sink = 
op.getDataSinks().iterator().next();
-                               SingleInputPlanNode mapper = 
(SingleInputPlanNode) sink.getInput().getSource();
-                               
-                               assertEquals(TempMode.PIPELINE_BREAKER, 
mapper.getInput().getTempMode());
-                       }
-                       catch (Exception e) {
-                               e.printStackTrace();
-                               fail(e.getMessage());
-                       }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
deleted file mode 100644
index 3e7da6c..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
-* Tests that validate optimizer choice when using hash joins inside of 
iterations
-*/
-@SuppressWarnings("serial")
-public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
-
-       /**
-        * This tests whether a HYBRIDHASH_BUILD_SECOND is correctly 
transformed to a HYBRIDHASH_BUILD_SECOND_CACHED
-        * when inside of an iteration an on the static path
-        */
-       @Test
-       public void testRightSide() {
-               try {
-                       
-                       Plan plan = 
getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
-                       
-                       OptimizedPlan oPlan = compileNoStats(plan);
-       
-                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(oPlan);
-                       DualInputPlanNode innerJoin = 
resolver.getNode("DummyJoiner");
-                       
-                       // verify correct join strategy
-                       
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, 
innerJoin.getDriverStrategy()); 
-                       assertEquals(TempMode.NONE, 
innerJoin.getInput1().getTempMode());
-                       assertEquals(TempMode.NONE, 
innerJoin.getInput2().getTempMode());
-               
-                       new JobGraphGenerator().compileJobGraph(oPlan);
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test errored: " + e.getMessage());
-               }
-       }
-       
-       /**
-        * This test makes sure that only a HYBRIDHASH on the static path is 
transformed to the cached variant
-        */
-       @Test
-       public void testRightSideCountercheck() {
-               try {
-                       
-                       Plan plan = 
getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
-                       
-                       OptimizedPlan oPlan = compileNoStats(plan);
-       
-                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(oPlan);
-                       DualInputPlanNode innerJoin = 
resolver.getNode("DummyJoiner");
-                       
-                       // verify correct join strategy
-                       assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, 
innerJoin.getDriverStrategy()); 
-                       assertEquals(TempMode.NONE, 
innerJoin.getInput1().getTempMode());
-                       assertEquals(TempMode.CACHED, 
innerJoin.getInput2().getTempMode());
-               
-                       new JobGraphGenerator().compileJobGraph(oPlan);
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test errored: " + e.getMessage());
-               }
-       }
-       
-       /**
-        * This tests whether a HYBRIDHASH_BUILD_FIRST is correctly transformed 
to a HYBRIDHASH_BUILD_FIRST_CACHED
-        * when inside of an iteration an on the static path
-        */
-       @Test
-       public void testLeftSide() {
-               try {
-                       
-                       Plan plan = 
getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
-                       
-                       OptimizedPlan oPlan = compileNoStats(plan);
-       
-                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(oPlan);
-                       DualInputPlanNode innerJoin = 
resolver.getNode("DummyJoiner");
-                       
-                       // verify correct join strategy
-                       
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED, 
innerJoin.getDriverStrategy());
-                       assertEquals(TempMode.NONE, 
innerJoin.getInput1().getTempMode());
-                       assertEquals(TempMode.NONE, 
innerJoin.getInput2().getTempMode());
-               
-                       new JobGraphGenerator().compileJobGraph(oPlan);
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test errored: " + e.getMessage());
-               }
-       }
-       
-       /**
-        * This test makes sure that only a HYBRIDHASH on the static path is 
transformed to the cached variant
-        */
-       @Test
-       public void testLeftSideCountercheck() {
-               try {
-                       
-                       Plan plan = 
getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
-                       
-                       OptimizedPlan oPlan = compileNoStats(plan);
-       
-                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(oPlan);
-                       DualInputPlanNode innerJoin = 
resolver.getNode("DummyJoiner");
-                       
-                       // verify correct join strategy
-                       assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, 
innerJoin.getDriverStrategy());
-                       assertEquals(TempMode.CACHED, 
innerJoin.getInput1().getTempMode());
-                       assertEquals(TempMode.NONE, 
innerJoin.getInput2().getTempMode());
-               
-                       new JobGraphGenerator().compileJobGraph(oPlan);
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test errored: " + e.getMessage());
-               }
-       }
-       
-       /**
-        * This test simulates a join of a big left side with a small right 
side inside of an iteration, where the small side is on a static path.
-        * Currently the best execution plan is a 
HYBRIDHASH_BUILD_SECOND_CACHED, where the small side is hashed and cached.
-        * This test also makes sure that all relevant plans are correctly 
enumerated by the optimizer.
-        */
-       @Test
-       public void testCorrectChoosing() {
-               try {
-                       
-                       Plan plan = getTestPlanRightStatic("");
-                       
-                       SourceCollectorVisitor sourceCollector = new 
SourceCollectorVisitor();
-                       plan.accept(sourceCollector);
-                       
-                       for(GenericDataSourceBase<?, ?> s : 
sourceCollector.getSources()) {
-                               if(s.getName().equals("bigFile")) {
-                                       this.setSourceStatistics(s, 10000000, 
1000);
-                               }
-                               else if(s.getName().equals("smallFile")) {
-                                       this.setSourceStatistics(s, 100, 100);
-                               }
-                       }
-                       
-                       
-                       OptimizedPlan oPlan = compileNoStats(plan);
-       
-                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(oPlan);
-                       DualInputPlanNode innerJoin = 
resolver.getNode("DummyJoiner");
-                       
-                       // verify correct join strategy
-                       
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, 
innerJoin.getDriverStrategy());
-                       assertEquals(TempMode.NONE, 
innerJoin.getInput1().getTempMode());
-                       assertEquals(TempMode.NONE, 
innerJoin.getInput2().getTempMode());
-               
-                       new JobGraphGenerator().compileJobGraph(oPlan);
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test errored: " + e.getMessage());
-               }
-       }
-       
-       private Plan getTestPlanRightStatic(String strategy) {
-               
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-               
-               DataSet<Tuple3<Long, Long, Long>> bigInput = 
env.readCsvFile("file://bigFile").types(Long.class, Long.class, 
Long.class).name("bigFile");
-               
-               DataSet<Tuple3<Long, Long, Long>> smallInput = 
env.readCsvFile("file://smallFile").types(Long.class, Long.class, 
Long.class).name("smallFile");
-               
-               IterativeDataSet<Tuple3<Long, Long, Long>> iteration = 
bigInput.iterate(10);
-               
-               Configuration joinStrategy = new Configuration();
-               joinStrategy.setString(Optimizer.HINT_SHIP_STRATEGY, 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
-               
-               if(strategy != "") {
-                       joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, 
strategy);
-               }
-               
-               DataSet<Tuple3<Long, Long, Long>> inner = 
iteration.join(smallInput).where(0).equalTo(0).with(new 
DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
-
-               DataSet<Tuple3<Long, Long, Long>> output = 
iteration.closeWith(inner);
-               
-               output.print();
-               
-               return env.createProgramPlan();
-               
-       }
-       
-       private Plan getTestPlanLeftStatic(String strategy) {
-               
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-               
-               @SuppressWarnings("unchecked")
-               DataSet<Tuple3<Long, Long, Long>> bigInput = 
env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L),
-                               new Tuple3<Long, Long, Long>(1L, 2L, 3L),new 
Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Big");
-               
-               @SuppressWarnings("unchecked")
-               DataSet<Tuple3<Long, Long, Long>> smallInput = 
env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Small");
-               
-               IterativeDataSet<Tuple3<Long, Long, Long>> iteration = 
bigInput.iterate(10);
-               
-               Configuration joinStrategy = new Configuration();
-               joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
-               
-               DataSet<Tuple3<Long, Long, Long>> inner = 
smallInput.join(iteration).where(0).equalTo(0).with(new 
DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
-
-               DataSet<Tuple3<Long, Long, Long>> output = 
iteration.closeWith(inner);
-               
-               output.print();
-               
-               return env.createProgramPlan();
-               
-       }
-       
-       private static class DummyJoiner extends RichJoinFunction<Tuple3<Long, 
Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> {
-
-               @Override
-               public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> 
first,
-                               Tuple3<Long, Long, Long> second) throws 
Exception {
-
-                       return first;
-               }
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
deleted file mode 100644
index eba07f1..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.Visitor;
-
-@SuppressWarnings("serial")
-public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
-       
-       public static class SimpleCGroup extends 
RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
-               @Override
-               public void coGroup(Iterable<Tuple1<Integer>> first, 
Iterable<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) {}
-       }
-
-       public static class SimpleMap extends RichMapFunction<Tuple1<Integer>, 
Tuple1<Integer>> {
-               @Override
-               public Tuple1<Integer> map(Tuple1<Integer> value) throws 
Exception {
-                       return null;
-               }
-       }
-
-       @Test
-       public void testCoGroupSolutionSet() {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple1<Integer>> raw = 
env.readCsvFile(IN_FILE).types(Integer.class);
-
-               DeltaIteration<Tuple1<Integer>, Tuple1<Integer>> iteration = 
raw.iterateDelta(raw, 1000, 0);
-
-               DataSet<Tuple1<Integer>> test = iteration.getWorkset().map(new 
SimpleMap());
-               DataSet<Tuple1<Integer>> delta = 
iteration.getSolutionSet().coGroup(test).where(0).equalTo(0).with(new 
SimpleCGroup());
-               DataSet<Tuple1<Integer>> feedback = 
iteration.getWorkset().map(new SimpleMap());
-               DataSet<Tuple1<Integer>> result = iteration.closeWith(delta, 
feedback);
-
-               result.print();
-
-               Plan plan = env.createProgramPlan();
-               OptimizedPlan oPlan = null;
-               try {
-                       oPlan = compileNoStats(plan);
-               } catch(CompilerException e) {
-                       Assert.fail(e.getMessage());
-               }
-
-               oPlan.accept(new Visitor<PlanNode>() {
-                       @Override
-                       public boolean preVisit(PlanNode visitable) {
-                               if (visitable instanceof 
WorksetIterationPlanNode) {
-                                       PlanNode deltaNode = 
((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();
-
-                                       //get the CoGroup
-                                       DualInputPlanNode dpn = 
(DualInputPlanNode) deltaNode.getInputs().iterator().next().getSource();
-                                       Channel in1 = dpn.getInput1();
-                                       Channel in2 = dpn.getInput2();
-
-                                       
Assert.assertTrue(in1.getLocalProperties().getOrdering() == null);
-                                       
Assert.assertTrue(in2.getLocalProperties().getOrdering() != null);
-                                       
Assert.assertTrue(in2.getLocalProperties().getOrdering().getInvolvedIndexes().contains(0));
-                                       Assert.assertTrue(in1.getShipStrategy() 
== ShipStrategyType.FORWARD);
-                                       Assert.assertTrue(in2.getShipStrategy() 
== ShipStrategyType.PARTITION_HASH);
-                                       return false;
-                               }
-                               return true;
-                       }
-
-                       @Override
-                       public void postVisit(PlanNode visitable) {}
-               });
-       }
-}

Reply via email to