Repository: incubator-flink
Updated Branches:
  refs/heads/master d2f0c4059 -> 92ceacd23


[FLINK-1357] [compiler] Add union between static and dynamic path


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0190dd24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0190dd24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0190dd24

Branch: refs/heads/master
Commit: 0190dd24622169a98be1a6ef518b0fdd018e2d44
Parents: d2f0c40
Author: Stephan Ewen <[email protected]>
Authored: Thu Dec 18 19:58:45 2014 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Jan 6 13:00:40 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/compiler/PactCompiler.java |  72 +++++++---
 .../compiler/plan/BinaryUnionPlanNode.java      |  23 ++-
 .../flink/compiler/plan/NAryUnionPlanNode.java  |   7 +-
 .../UnionBetweenDynamicAndStaticPathTest.java   | 143 +++++++++++++++++++
 .../flink/compiler/UnionReplacementTest.java    |  34 ++---
 .../src/test/resources/log4j.properties         |  27 ++++
 .../flink/runtime/operators/DriverStrategy.java |   5 +-
 .../operators/UnionWithTempOperator.java        | 124 ++++++++--------
 .../UnionStaticDynamicIterationITCase.java      |  55 +++++++
 flink-tests/src/test/resources/log4j.properties |  27 ++++
 10 files changed, 412 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0190dd24/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index a63cfd1..4411d3e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -95,6 +95,7 @@ import org.apache.flink.compiler.plan.WorksetPlanNode;
 import org.apache.flink.compiler.postpass.OptimizerPostPass;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.util.InstantiationUtil;
@@ -580,9 +581,7 @@ public class PactCompiler {
 
                // finalize the plan
                OptimizedPlan plan = new 
PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
-
-               // swap the binary unions for n-ary unions. this changes no 
strategies or memory consumers whatsoever, so
-               // we can do this after the plan finalization
+               
                plan.accept(new BinaryUnionReplacer());
                
                // post pass the plan. this is the phase where the 
serialization and comparator code is set
@@ -1029,7 +1028,6 @@ public class PactCompiler {
                        }
                }
 
-
                @Override
                public void postVisit(OptimizerNode visitable) {}
        }
@@ -1057,8 +1055,11 @@ public class PactCompiler {
        }
        
        /**
-        * Utility class that traverses a plan to collect all nodes and add 
them to the OptimizedPlan.
-        * Besides collecting all nodes, this traversal assigns the memory to 
the nodes.
+        * Finalization of the plan:
+        *  - The graph of nodes is double-linked (links from child to parent 
are inserted)
+        *  - If unions join static and dynamic paths, the cache is marked as a 
memory consumer
+        *  - Relative memory fractions are assigned to all nodes.
+        *  - All nodes are collected into a set.
         */
        private static final class PlanFinalizer implements Visitor<PlanNode> {
                
@@ -1119,9 +1120,7 @@ public class PactCompiler {
                                                        
c.setRelativeTempMemory(relativeMem);
                                                        if 
(LOG.isDebugEnabled()) {
                                                                
LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the 
temp " +
-                                                                               
"table" +
-                                                                               
" " +
-                                                                               
"for " + c + ".");
+                                                                               
"table for " + c + ".");
                                                        }
                                                }
                                        }
@@ -1143,6 +1142,12 @@ public class PactCompiler {
                        else if (visitable instanceof SourcePlanNode) {
                                this.sources.add((SourcePlanNode) visitable);
                        }
+                       else if (visitable instanceof BinaryUnionPlanNode) {
+                               BinaryUnionPlanNode unionNode = 
(BinaryUnionPlanNode) visitable;
+                               if (unionNode.unionsStaticAndDynamicPath()) {
+                                       
unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED);
+                               }
+                       }
                        else if (visitable instanceof 
BulkPartialSolutionPlanNode) {
                                // tell the partial solution about the 
iteration node that contains it
                                final BulkPartialSolutionPlanNode pspn = 
(BulkPartialSolutionPlanNode) visitable;
@@ -1229,7 +1234,6 @@ public class PactCompiler {
                @Override
                public void postVisit(PlanNode visitable) {}
        }
-
        
        /**
         * A visitor that traverses the graph and collects cascading binary 
unions into a single n-ary
@@ -1256,24 +1260,50 @@ public class PactCompiler {
                public void postVisit(PlanNode visitable) {
                        
                        if (visitable instanceof BinaryUnionPlanNode) {
+                               
                                final BinaryUnionPlanNode unionNode = 
(BinaryUnionPlanNode) visitable;
                                final Channel in1 = unionNode.getInput1();
                                final Channel in2 = unionNode.getInput2();
                        
-                               PlanNode newUnionNode;
+                               if (!unionNode.unionsStaticAndDynamicPath()) {
+                                       
+                                       // both on static path, or both on 
dynamic path. we can collapse them
+                                       NAryUnionPlanNode newUnionNode;
 
-                               List<Channel> inputs = new ArrayList<Channel>();
-                               collect(in1, inputs);
-                               collect(in2, inputs);
+                                       List<Channel> inputs = new 
ArrayList<Channel>();
+                                       collect(in1, inputs);
+                                       collect(in2, inputs);
 
-                               newUnionNode = new 
NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs, 
unionNode.getGlobalProperties());
+                                       newUnionNode = new 
NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs, 
+                                                       
unionNode.getGlobalProperties(), unionNode.getCumulativeCosts());
+                                       
+                                       
newUnionNode.setDegreeOfParallelism(unionNode.getDegreeOfParallelism());
 
-                               for (Channel c : inputs) {
-                                       c.setTarget(newUnionNode);
-                               }
+                                       for (Channel c : inputs) {
+                                               c.setTarget(newUnionNode);
+                                       }
 
-                               for(Channel channel : 
unionNode.getOutgoingChannels()){
-                                       channel.swapUnionNodes(newUnionNode);
+                                       for (Channel channel : 
unionNode.getOutgoingChannels()) {
+                                               
channel.swapUnionNodes(newUnionNode);
+                                               
newUnionNode.addOutgoingChannel(channel);
+                                       }
+                               }
+                               else {
+                                       // union between the static and the 
dynamic path. we need to handle this for now
+                                       // through a special union operator
+                                       
+                                       // make sure that the first input is 
the cached (static) and the second input is the dynamic
+                                       if (in1.isOnDynamicPath()) {
+                                               BinaryUnionPlanNode 
newUnionNode = new BinaryUnionPlanNode(unionNode);
+                                               
+                                               in1.setTarget(newUnionNode);
+                                               in2.setTarget(newUnionNode);
+                                               
+                                               for (Channel channel : 
unionNode.getOutgoingChannels()) {
+                                                       
channel.swapUnionNodes(newUnionNode);
+                                                       
newUnionNode.addOutgoingChannel(channel);
+                                               }
+                                       }
                                }
                        }
                }
@@ -1290,7 +1320,7 @@ public class PactCompiler {
                                
                                inputs.addAll(((NAryUnionPlanNode) 
in.getSource()).getListOfInputs());
                        } else {
-                               // is not a union node, so we take the channel 
directly
+                               // is not a collapsed union node, so we take 
the channel directly
                                inputs.add(in);
                        }
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0190dd24/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BinaryUnionPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BinaryUnionPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BinaryUnionPlanNode.java
index 8e5bb81..039952d 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BinaryUnionPlanNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BinaryUnionPlanNode.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.plan;
 
-
 import org.apache.flink.compiler.dag.BinaryUnionNode;
 import org.apache.flink.runtime.operators.DriverStrategy;
 
@@ -35,7 +33,28 @@ public class BinaryUnionPlanNode extends DualInputPlanNode {
                super(template, "Union", in1, in2, DriverStrategy.UNION);
        }
        
+       public BinaryUnionPlanNode(BinaryUnionPlanNode toSwapFrom) {
+               super(toSwapFrom.getOptimizerNode(), "Union-With-Cached", 
toSwapFrom.getInput2(), toSwapFrom.getInput1(),
+                               DriverStrategy.UNION_WITH_CACHED);
+               
+               this.globalProps = toSwapFrom.globalProps;
+               this.localProps = toSwapFrom.localProps;
+               this.nodeCosts = toSwapFrom.nodeCosts;
+               this.cumulativeCosts = toSwapFrom.cumulativeCosts;
+               
+               setDegreeOfParallelism(toSwapFrom.getDegreeOfParallelism());
+       }
+       
        public BinaryUnionNode getOptimizerNode() {
                return (BinaryUnionNode) this.template;
        }
+       
+       public boolean unionsStaticAndDynamicPath() {
+               return getInput1().isOnDynamicPath() != 
getInput2().isOnDynamicPath();
+       }
+       
+       @Override
+       public int getMemoryConsumerWeight() {
+               return 0;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0190dd24/flink-compiler/src/main/java/org/apache/flink/compiler/plan/NAryUnionPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/NAryUnionPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/NAryUnionPlanNode.java
index b7ed023..a1b1312 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/NAryUnionPlanNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/NAryUnionPlanNode.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.flink.compiler.costs.Costs;
 import org.apache.flink.compiler.dag.BinaryUnionNode;
 import org.apache.flink.compiler.dataproperties.GlobalProperties;
 import org.apache.flink.compiler.dataproperties.LocalProperties;
@@ -40,12 +41,16 @@ public class NAryUnionPlanNode extends PlanNode {
        /**
         * @param template
         */
-       public NAryUnionPlanNode(BinaryUnionNode template, List<Channel> 
inputs, GlobalProperties gProps) {
+       public NAryUnionPlanNode(BinaryUnionNode template, List<Channel> 
inputs, GlobalProperties gProps,
+                       Costs cumulativeCosts)
+       {
                super(template, "Union", DriverStrategy.NONE);
                
                this.inputs = inputs;
                this.globalProps = gProps;
                this.localProps = new LocalProperties();
+               this.nodeCosts = new Costs();
+               this.cumulativeCosts = cumulativeCosts;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0190dd24/flink-compiler/src/test/java/org/apache/flink/compiler/UnionBetweenDynamicAndStaticPathTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionBetweenDynamicAndStaticPathTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionBetweenDynamicAndStaticPathTest.java
new file mode 100644
index 0000000..9cb6b2e
--- /dev/null
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionBetweenDynamicAndStaticPathTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.compiler.plan.BinaryUnionPlanNode;
+import org.apache.flink.compiler.plan.BulkIterationPlanNode;
+import org.apache.flink.compiler.plan.Channel;
+import org.apache.flink.compiler.plan.NAryUnionPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {
+
+       @Test
+       public void testUnionStaticFirst() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<Long> input1 = env.generateSequence(1, 10);
+                       DataSet<Long> input2 = env.generateSequence(1, 10);
+                       
+                       IterativeDataSet<Long> iteration = input1.iterate(10);
+                       
+                       DataSet<Long> result = iteration.closeWith(
+                                       
input2.union(input2).union(iteration.union(iteration)));
+                               
+                       result.print();
+                       result.print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       assertEquals(2, op.getDataSinks().size());
+                       
+                       BulkIterationPlanNode iterPlan = 
(BulkIterationPlanNode) 
op.getDataSinks().iterator().next().getInput().getSource();
+                       
+                       SingleInputPlanNode noopNode = (SingleInputPlanNode) 
iterPlan.getRootOfStepFunction();
+                       BinaryUnionPlanNode mixedUnion = (BinaryUnionPlanNode) 
noopNode.getInput().getSource();
+                       NAryUnionPlanNode staticUnion = (NAryUnionPlanNode) 
mixedUnion.getInput1().getSource();
+                       NAryUnionPlanNode dynamicUnion = (NAryUnionPlanNode) 
mixedUnion.getInput2().getSource();
+                       
+                       assertTrue(mixedUnion.unionsStaticAndDynamicPath());
+                       assertFalse(mixedUnion.getInput1().isOnDynamicPath());
+                       assertTrue(mixedUnion.getInput2().isOnDynamicPath());
+                       
assertTrue(mixedUnion.getInput1().getTempMode().isCached());
+                       
+                       for (Channel c : staticUnion.getInputs()) {
+                               assertFalse(c.isOnDynamicPath());
+                       }
+                       for (Channel c : dynamicUnion.getInputs()) {
+                               assertTrue(c.isOnDynamicPath());
+                       }
+                       
+                       assertEquals(0.5, 
iterPlan.getRelativeMemoryPerSubTask(), 0.0);
+                       assertEquals(0.5, 
mixedUnion.getInput1().getRelativeTempMemory(), 0.0);
+                       assertEquals(0.0, 
mixedUnion.getInput2().getRelativeTempMemory(), 0.0);
+                       
+                       new NepheleJobGraphGenerator().compileJobGraph(op);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testUnionStaticSecond() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<Long> input1 = env.generateSequence(1, 10);
+                       DataSet<Long> input2 = env.generateSequence(1, 10);
+                       
+                       IterativeDataSet<Long> iteration = input1.iterate(10);
+                       
+                       DataSet<Long> iterResult = iteration
+                               
.closeWith(iteration.union(iteration).union(input2.union(input2)));
+                       
+                       iterResult.print();
+                       iterResult.print();
+                       
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       assertEquals(2, op.getDataSinks().size());
+                       
+                       BulkIterationPlanNode iterPlan = 
(BulkIterationPlanNode) 
op.getDataSinks().iterator().next().getInput().getSource();
+                       
+                       SingleInputPlanNode noopNode = (SingleInputPlanNode) 
iterPlan.getRootOfStepFunction();
+                       BinaryUnionPlanNode mixedUnion = (BinaryUnionPlanNode) 
noopNode.getInput().getSource();
+                       NAryUnionPlanNode staticUnion = (NAryUnionPlanNode) 
mixedUnion.getInput1().getSource();
+                       NAryUnionPlanNode dynamicUnion = (NAryUnionPlanNode) 
mixedUnion.getInput2().getSource();
+                       
+                       assertTrue(mixedUnion.unionsStaticAndDynamicPath());
+                       assertFalse(mixedUnion.getInput1().isOnDynamicPath());
+                       assertTrue(mixedUnion.getInput2().isOnDynamicPath());
+                       
assertTrue(mixedUnion.getInput1().getTempMode().isCached());
+                       
+                       assertEquals(0.5, 
iterPlan.getRelativeMemoryPerSubTask(), 0.0);
+                       assertEquals(0.5, 
mixedUnion.getInput1().getRelativeTempMemory(), 0.0);
+                       assertEquals(0.0, 
mixedUnion.getInput2().getRelativeTempMemory(), 0.0);
+                       
+                       for (Channel c : staticUnion.getInputs()) {
+                               assertFalse(c.isOnDynamicPath());
+                       }
+                       for (Channel c : dynamicUnion.getInputs()) {
+                               assertTrue(c.isOnDynamicPath());
+                       }
+                       
+                       new NepheleJobGraphGenerator().compileJobGraph(op);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0190dd24/flink-compiler/src/test/java/org/apache/flink/compiler/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionReplacementTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionReplacementTest.java
index 0ce6468..5cdac1b 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionReplacementTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionReplacementTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.compiler;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
 import org.junit.Test;
@@ -32,24 +31,25 @@ import static org.junit.Assert.fail;
 public class UnionReplacementTest extends CompilerTestBase {
 
        @Test
-       public void testUnionReplacement(){
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<String> input1 = env.fromElements("test1");
-               DataSet<String> input2 = env.fromElements("test2");
-
-               DataSet<String> union = input1.union(input2);
-
-               union.print();
-               union.print();
-
-               Plan plan = env.createProgramPlan();
-               try{
-                       OptimizedPlan oPlan = this.compileNoStats(plan);
+       public void testUnionReplacement() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       DataSet<String> input1 = env.fromElements("test1");
+                       DataSet<String> input2 = env.fromElements("test2");
+       
+                       DataSet<String> union = input1.union(input2);
+       
+                       union.print();
+                       union.print();
+       
+                       Plan plan = env.createProgramPlan();
+                       OptimizedPlan oPlan = compileNoStats(plan);
                        NepheleJobGraphGenerator jobGen = new 
NepheleJobGraphGenerator();
                        jobGen.compileJobGraph(oPlan);
-               }catch(CompilerException co){
-                       co.printStackTrace();
-                       fail("The Pact compiler is unable to compile this plan 
correctly.");
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0190dd24/flink-compiler/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/resources/log4j.properties 
b/flink-compiler/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fa3f937
--- /dev/null
+++ b/flink-compiler/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target  = System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0190dd24/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index a133c6c..ae9b474 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import static org.apache.flink.runtime.operators.DamBehavior.FULL_DAM;
@@ -93,9 +92,9 @@ public enum DriverStrategy {
        NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, 
PIPELINED, 0),
        
        // union utility op. unions happen implicitly on the network layer (in 
the readers) when bundeling streams
-       UNION(null, null, FULL_DAM, FULL_DAM, 0);
+       UNION(null, null, PIPELINED, PIPELINED, 0),
        // explicit binary union between a streamed and a cached input
-//     UNION_WITH_CACHED(UnionWithTempOperator.class, null, FULL_DAM, 
PIPELINED, false);
+       UNION_WITH_CACHED(UnionWithTempOperator.class, null, FULL_DAM, 
PIPELINED, 0);
        
        // 
--------------------------------------------------------------------------------------------
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0190dd24/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
index 515b1e3..d8437a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
@@ -16,66 +16,68 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
-//public class UnionWithTempOperator<T> implements PactDriver<Function, T> {
-//     
-//     private PactTaskContext<Function, T> taskContext;
-//     
-//     private volatile boolean running;
-//     
-//     
-//     @Override
-//     public void setup(PactTaskContext<Function, T> context) {
-//             this.taskContext = context;
-//             this.running = true;
-//     }
-//
-//     @Override
-//     public int getNumberOfInputs() {
-//             return 2;
-//     }
-//
-//     @Override
-//     public Class<Function> getStubType() {
-//             return Function.class;
-//     }
-//
-//     @Override
-//     public boolean requiresComparatorOnInput() {
-//             return false;
-//     }
-//
-//     @Override
-//     public void prepare() {}
-//
-//     @Override
-//     public void run() throws Exception {
-//             
-//             final int tempedInput = 0;
-//             final int streamedInput = 1;
-//             
-//             final MutableObjectIterator<T> cache = 
this.taskContext.getInput(tempedInput);
-//             final MutableObjectIterator<T> input = 
this.taskContext.getInput(streamedInput);
-//             
-//             final Collector<T> output = 
this.taskContext.getOutputCollector();
-//
-//             T record = 
this.taskContext.<T>getInputSerializer(streamedInput).createInstance();
-//
-//             while (this.running && ((record = input.next(record)) != null)) 
{
-//                     output.collect(record);
-//             }
-//             while (this.running && ((record = cache.next(record)) != null)) 
{
-//                     output.collect(record);
-//             }
-//     }
-//
-//     @Override
-//     public void cleanup() {}
-//
-//     @Override
-//     public void cancel() {
-//             this.running = false;
-//     }
-//}
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class UnionWithTempOperator<T> implements PactDriver<Function, T> {
+       
+       private static final int CACHED_INPUT = 0;
+       private static final int STREAMED_INPUT = 1;
+       
+       private PactTaskContext<Function, T> taskContext;
+       
+       private volatile boolean running;
+       
+       
+       @Override
+       public void setup(PactTaskContext<Function, T> context) {
+               this.taskContext = context;
+               this.running = true;
+       }
+
+       @Override
+       public int getNumberOfInputs() {
+               return 2;
+       }
+       
+       @Override
+       public int getNumberOfDriverComparators() {
+               return 0;
+       }
+
+       @Override
+       public Class<Function> getStubType() {
+               return null; // no UDF
+       }
+
+       @Override
+       public void prepare() {}
+
+       @Override
+       public void run() throws Exception {
+               
+               final Collector<T> output = 
this.taskContext.getOutputCollector();
+               T record = 
this.taskContext.<T>getInputSerializer(STREAMED_INPUT).getSerializer().createInstance();
+               
+               final MutableObjectIterator<T> input = 
this.taskContext.getInput(STREAMED_INPUT);
+               while (this.running && ((record = input.next(record)) != null)) 
{
+                       output.collect(record);
+               }
+               
+               final MutableObjectIterator<T> cache = 
this.taskContext.getInput(CACHED_INPUT);
+               while (this.running && ((record = cache.next(record)) != null)) 
{
+                       output.collect(record);
+               }
+       }
+
+       @Override
+       public void cleanup() {}
+
+       @Override
+       public void cancel() {
+               this.running = false;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0190dd24/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java
new file mode 100644
index 0000000..fa8643f
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.iterative;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class UnionStaticDynamicIterationITCase  extends JavaProgramTestBase {
+       
+       private final ArrayList<Long> result = new ArrayList<Long>();
+       
+       @Override
+       protected void testProgram() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               DataSet<Long> inputStatic = env.generateSequence(1, 4);
+               DataSet<Long> inputIteration = env.generateSequence(1, 4);
+               
+               IterativeDataSet<Long> iteration = inputIteration.iterate(3);
+               
+               DataSet<Long> result = 
iteration.closeWith(inputStatic.union(inputStatic).union(iteration.union(iteration)));
+                       
+               result.output(new 
LocalCollectionOutputFormat<Long>(this.result));
+               
+               env.execute();
+       }
+       
+       @Override
+       protected void postSubmit() throws Exception {
+               assertEquals(88, result.size());
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0190dd24/flink-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j.properties 
b/flink-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..6bf344a
--- /dev/null
+++ b/flink-tests/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target = System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
\ No newline at end of file

Reply via email to