[FLINK-1023] Switch group-at-a-time function to java.lang.Iterable (from 
java.util.Iterator)
Iterables over transient data throw an TraversableOnceException when iterated 
over again.

This closes #84


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/72d7b862
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/72d7b862
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/72d7b862

Branch: refs/heads/release-0.6
Commit: 72d7b86274c33d1570ffb22b1fca2081c15d753c
Parents: 934e4e0
Author: Stephan Ewen <[email protected]>
Authored: Tue Jul 29 15:58:44 2014 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Aug 1 02:33:49 2014 +0200

----------------------------------------------------------------------
 .../api/avro/AvroWithEmptyArrayITCase.java      |   7 +-
 .../mapred/record/example/WordCount.java        |   2 +-
 .../example/WordCountWithOutputFormat.java      |   2 +-
 .../spargel/java/VertexCentricIteration.java    |  42 ++--
 .../spargel/java/record/SpargelIteration.java   |   2 +-
 .../compiler/CoGroupSolutionSetFirstTest.java   |   7 +-
 .../compiler/GroupReduceCompilationTest.java    |  15 +-
 .../flink/compiler/IterationsCompilerTest.java  |   5 +-
 .../WorksetIterationsJavaApiCompilerTest.java   |  10 +-
 .../testfunctions/IdentityGroupReducer.java     |   8 +-
 .../testfunctions/Top1GroupReducer.java         |   7 +-
 .../flink/compiler/util/DummyCoGroupStub.java   |   1 -
 .../flink/compiler/util/IdentityReduce.java     |   1 -
 .../api/common/functions/CoGroupFunction.java   |   9 +-
 .../api/common/functions/CombineFunction.java   |   3 +-
 .../common/functions/FlatCombineFunction.java   |   3 +-
 .../common/functions/GroupReduceFunction.java   |   3 +-
 .../flink/util/TraversableOnceException.java    |  28 +++
 .../example/java/graph/EnumTrianglesBasic.java  |  10 +-
 .../example/java/graph/EnumTrianglesOpt.java    |  12 +-
 .../flink/example/java/graph/PageRankBasic.java |   6 +-
 .../java/graph/TransitiveClosureNaive.java      |   6 +-
 .../example/java/relational/WebLogAnalysis.java |  10 +-
 .../flink/api/java/ExecutionEnvironment.java    |  21 ++
 .../api/java/functions/GroupReduceIterator.java |   4 +-
 .../api/java/functions/RichCoGroupFunction.java |   4 +-
 .../java/functions/RichFlatCombineFunction.java |   4 +-
 .../java/functions/RichGroupReduceFunction.java |   9 +-
 .../api/java/operators/AggregateOperator.java   |   3 +-
 .../api/java/operators/CoGroupOperator.java     |  10 +-
 .../api/java/operators/DistinctOperator.java    |   6 +-
 .../api/java/operators/GroupReduceOperator.java |   5 -
 .../PlanUnwrappingCoGroupOperator.java          |  44 +---
 .../translation/PlanUnwrappingJoinOperator.java |   8 -
 .../PlanUnwrappingReduceGroupOperator.java      |  14 +-
 .../translation/TupleUnwrappingIterator.java    |  15 +-
 .../operators/translation/WrappingFunction.java |   3 +-
 .../java/record/functions/CoGroupFunction.java  |   4 +-
 .../java/record/functions/ReduceFunction.java   |   7 +-
 .../java/record/operators/CoGroupOperator.java  |  61 ++++-
 .../java/record/operators/ReduceOperator.java   | 107 ++++++--
 .../DeltaIterationTranslationTest.java          |   4 +-
 .../record/CoGroupWrappingFunctionTest.java     | 221 +++++++++++++++++
 .../java/record/ReduceWrappingFunctionTest.java | 246 +++++++++++++++++++
 .../java/type/extractor/TypeExtractorTest.java  |   5 +-
 .../javaApiOperators/lambdas/CoGroupITCase.java |  11 +-
 .../lambdas/GroupReduceITCase.java              |   8 +-
 .../CoGroupWithSolutionSetFirstDriver.java      |   6 +-
 .../CoGroupWithSolutionSetSecondDriver.java     |   5 +-
 .../runtime/operators/RegularPactTask.java      |   1 -
 .../sort/CombiningUnilateralSortMerger.java     |  16 +-
 .../operators/sort/FixedLengthRecordSorter.java |   2 +-
 .../runtime/operators/sort/InMemorySorter.java  |   2 +-
 .../runtime/operators/sort/MergeIterator.java   |  17 +-
 .../operators/sort/MergeMatchIterator.java      |   1 -
 .../sort/SortMergeCoGroupIterator.java          |  20 +-
 .../operators/util/CoGroupTaskIterator.java     |   9 +-
 .../flink/runtime/util/EmptyIterator.java       |  11 +-
 .../flink/runtime/util/KeyGroupedIterator.java  |  23 +-
 .../util/MutableToRegularIteratorWrapper.java   |  16 +-
 .../runtime/util/SingleElementIterator.java     |   7 +-
 .../runtime/operators/CachedMatchTaskTest.java  |  18 +-
 .../operators/CoGroupTaskExternalITCase.java    |  19 +-
 .../runtime/operators/CoGroupTaskTest.java      |  30 +--
 .../runtime/operators/CombineTaskTest.java      |  35 ++-
 .../flink/runtime/operators/MapTaskTest.java    |   1 +
 .../operators/ReduceTaskExternalITCase.java     |  35 ++-
 .../flink/runtime/operators/ReduceTaskTest.java |  50 ++--
 .../operators/chaining/ChainTaskTest.java       |  15 +-
 .../drivers/AllGroupReduceDriverTest.java       |  16 +-
 .../drivers/GroupReduceDriverTest.java          |  16 +-
 .../CombiningUnilateralSortMergerITCase.java    |  24 +-
 .../sort/SortMergeCoGroupIteratorITCase.java    |   4 +-
 .../runtime/util/KeyGroupedIteratorTest.java    |  43 +++-
 .../api/scala/functions/ReduceFunction.scala    |   4 +-
 .../api/scala/operators/CoGroupOperator.scala   |   7 +-
 .../api/scala/operators/ReduceOperator.scala    |   5 +-
 .../test/accumulators/AccumulatorITCase.java    |   1 +
 .../AccumulatorIterativeITCase.java             |   8 +-
 .../KMeansIterativeNepheleITCase.java           |   3 +-
 .../BulkIterationWithAllReducerITCase.java      |   8 +-
 .../CoGroupConnectedComponentsITCase.java       |   4 +-
 .../CoGroupConnectedComponentsSecondITCase.java |  12 +-
 .../DependencyConnectedComponentsITCase.java    |  30 +--
 ...IterationTerminationWithTerminationTail.java |   1 -
 .../IterationTerminationWithTwoTails.java       |   2 +-
 .../IterationWithAllReducerITCase.java          |   1 -
 .../iterative/IterationWithChainingITCase.java  | 118 +++++----
 .../iterative/IterationWithUnionITCase.java     |  81 ++----
 ...nentsWithParametrizableAggregatorITCase.java |  35 ++-
 ...entsWithParametrizableConvergenceITCase.java |  20 +-
 .../ConnectedComponentsNepheleITCase.java       |   5 +-
 .../IterationWithChainingNepheleITCase.java     |  11 +-
 .../CustomCompensatableDotProductCoGroup.java   |  11 +-
 .../CustomCompensatingMap.java                  |   1 +
 .../CustomRankCombiner.java                     |   6 +-
 .../CompensatableDotProductCoGroup.java         |   4 +-
 .../test/javaApiOperators/CoGroupITCase.java    |  94 +++----
 .../javaApiOperators/GroupReduceITCase.java     | 123 ++++------
 .../flink/test/operators/CoGroupITCase.java     |  29 +--
 .../flink/test/operators/ReduceITCase.java      |  23 +-
 .../recordJobTests/GroupOrderReduceITCase.java  |   1 -
 .../graph/WorksetConnectedComponents.java       |   2 +-
 103 files changed, 1295 insertions(+), 820 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
index ea9edff..89db1fa 100644
--- 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
+++ 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
@@ -194,15 +194,14 @@ public class AvroWithEmptyArrayITCase extends 
RecordAPITestBase {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void coGroup(Iterator<Record> records1, Iterator<Record> 
records2, Collector<Record> out)
-                               throws Exception {
+               public void coGroup(Iterator<Record> records1, Iterator<Record> 
records2, Collector<Record> out) {
 
                        Record r1 = null;
-                       if (records1.hasNext()) {
+                       while (records1.hasNext()) {
                                r1 = records1.next();
                        }
                        Record r2 = null;
-                       if (records2.hasNext()) {
+                       while (records2.hasNext()) {
                                r2 = records2.next();
                        }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
index 25caf0c..88b1892 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.hadoopcompatibility.mapred.record.example;
 
 import java.io.Serializable;
@@ -103,6 +102,7 @@ public class WordCount implements Program, 
ProgramDescription {
                public void reduce(Iterator<Record> records, Collector<Record> 
out) throws Exception {
                        Record element = null;
                        int sum = 0;
+                       
                        while (records.hasNext()) {
                                element = records.next();
                                int cnt = element.getField(1, 
IntValue.class).getValue();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
index a3cd3d5..8aaf8e5 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.hadoopcompatibility.mapred.record.example;
 
 import java.io.Serializable;
@@ -101,6 +100,7 @@ public class WordCountWithOutputFormat implements Program, 
ProgramDescription {
                public void reduce(Iterator<Record> records, Collector<Record> 
out) throws Exception {
                        Record element = null;
                        int sum = 0;
+                       
                        while (records.hasNext()) {
                                element = records.next();
                                int cnt = element.getField(1, 
IntValue.class).getValue();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
index 65be2f8..8f9149c 100644
--- 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -413,24 +413,28 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
                }
 
                @Override
-               public void coGroup(Iterator<Tuple2<VertexKey, Message>> 
messages, Iterator<Tuple2<VertexKey, VertexValue>> vertex,
+               public void coGroup(Iterable<Tuple2<VertexKey, Message>> 
messages, Iterable<Tuple2<VertexKey, VertexValue>> vertex,
                                Collector<Tuple2<VertexKey, VertexValue>> out)
                        throws Exception
                {
-                       if (vertex.hasNext()) {
-                               Tuple2<VertexKey, VertexValue> vertexState = 
vertex.next();
+                       final Iterator<Tuple2<VertexKey, VertexValue>> 
vertexIter = vertex.iterator();
+                       
+                       if (vertexIter.hasNext()) {
+                               Tuple2<VertexKey, VertexValue> vertexState = 
vertexIter.next();
                                
                                @SuppressWarnings("unchecked")
-                               Iterator<Tuple2<?, Message>> downcastIter = 
(Iterator<Tuple2<?, Message>>) (Iterator<?>) messages;
+                               Iterator<Tuple2<?, Message>> downcastIter = 
(Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
                                messageIter.setSource(downcastIter);
                                
                                vertexUpdateFunction.setOutput(vertexState, 
out);
                                
vertexUpdateFunction.updateVertex(vertexState.f0, vertexState.f1, messageIter);
-                       } else {
-                               if (messages.hasNext()) {
+                       }
+                       else {
+                               final Iterator<Tuple2<VertexKey, Message>> 
messageIter = messages.iterator();
+                               if (messageIter.hasNext()) {
                                        String message = "Target vertex does 
not exist!.";
                                        try {
-                                               Tuple2<VertexKey, Message> next 
= messages.next();
+                                               Tuple2<VertexKey, Message> next 
= messageIter.next();
                                                message = "Target vertex '" + 
next.f0 + "' does not exist!.";
                                        } catch (Throwable t) {}
                                        throw new Exception(message);
@@ -481,13 +485,15 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
                }
                
                @Override
-               public void coGroup(Iterator<Tuple2<VertexKey, VertexKey>> 
edges,
-                               Iterator<Tuple2<VertexKey, VertexValue>> state, 
Collector<Tuple2<VertexKey, Message>> out)
+               public void coGroup(Iterable<Tuple2<VertexKey, VertexKey>> 
edges,
+                               Iterable<Tuple2<VertexKey, VertexValue>> state, 
Collector<Tuple2<VertexKey, Message>> out)
                        throws Exception
                {
-                       if (state.hasNext()) {
-                               Tuple2<VertexKey, VertexValue> newVertexState = 
state.next();
-                               messagingFunction.set((Iterator<?>) edges, out);
+                       final Iterator<Tuple2<VertexKey, VertexValue>> 
stateIter = state.iterator();
+                       
+                       if (stateIter.hasNext()) {
+                               Tuple2<VertexKey, VertexValue> newVertexState = 
stateIter.next();
+                               messagingFunction.set((Iterator<?>) 
edges.iterator(), out);
                                
messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
                        }
                }
@@ -534,13 +540,15 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
                }
 
                @Override
-               public void coGroup(Iterator<Tuple3<VertexKey, VertexKey, 
EdgeValue>> edges,
-                               Iterator<Tuple2<VertexKey, VertexValue>> state, 
Collector<Tuple2<VertexKey, Message>> out)
+               public void coGroup(Iterable<Tuple3<VertexKey, VertexKey, 
EdgeValue>> edges,
+                               Iterable<Tuple2<VertexKey, VertexValue>> state, 
Collector<Tuple2<VertexKey, Message>> out)
                        throws Exception
                {
-                       if (state.hasNext()) {
-                               Tuple2<VertexKey, VertexValue> newVertexState = 
state.next();
-                               messagingFunction.set((Iterator<?>) edges, out);
+                       final Iterator<Tuple2<VertexKey, VertexValue>> 
stateIter = state.iterator();
+                       
+                       if (stateIter.hasNext()) {
+                               Tuple2<VertexKey, VertexValue> newVertexState = 
stateIter.next();
+                               messagingFunction.set((Iterator<?>) 
edges.iterator(), out);
                                
messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
                        }
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
index 3a58afc..96bc799 100644
--- 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
+++ 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
@@ -161,7 +161,7 @@ public class SpargelIteration {
 
                @Override
                public void coGroup(Iterator<Record> messages, Iterator<Record> 
vertex, Collector<Record> out) throws Exception {
-
+                       
                        if (vertex.hasNext()) {
                                Record first = vertex.next();
                                first.getFieldInto(0, vertexKey);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
index 3624d86..2a4d6d8 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.java.functions.RichCoGroupFunction;
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.junit.Assert;
@@ -30,6 +27,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
@@ -44,8 +42,7 @@ public class CoGroupSolutionSetFirstTest extends 
CompilerTestBase {
        
        public static class SimpleCGroup extends 
RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
                @Override
-               public void coGroup(Iterator<Tuple1<Integer>> first, 
Iterator<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) throws 
Exception {
-               }
+               public void coGroup(Iterable<Tuple1<Integer>> first, 
Iterable<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) {}
        }
 
        public static class SimpleMap extends RichMapFunction<Tuple1<Integer>, 
Tuple1<Integer>> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
index b9cc769..9f63683 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
@@ -51,7 +48,7 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 
0.5).name("source");
                        
                        data.reduceGroup(new RichGroupReduceFunction<Double, 
Double>() {
-                               public void reduce(Iterator<Double> values, 
Collector<Double> out) {}
+                               public void reduce(Iterable<Double> values, 
Collector<Double> out) {}
                        }).name("reducer")
                        .print().name("sink");
                        
@@ -95,7 +92,7 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        DataSet<Long> data = env.generateSequence(1, 
8000000).name("source");
                        
                        GroupReduceOperator<Long, Long> reduced = 
data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
-                               public void reduce(Iterator<Long> values, 
Collector<Long> out) {}
+                               public void reduce(Iterable<Long> values, 
Collector<Long> out) {}
                        }).name("reducer");
                        
                        reduced.setCombinable(true);
@@ -148,7 +145,7 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        data
                                .groupBy(1)
                                .reduceGroup(new 
RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
-                               public void reduce(Iterator<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
+                               public void reduce(Iterable<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
                        }).name("reducer")
                        .print().name("sink");
                        
@@ -197,7 +194,7 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        GroupReduceOperator<Tuple2<String, Double>, 
Tuple2<String, Double>> reduced = data
                                        .groupBy(1)
                                        .reduceGroup(new 
RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
-                               public void reduce(Iterator<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
+                               public void reduce(Iterable<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
                        }).name("reducer");
                        
                        reduced.setCombinable(true);
@@ -256,7 +253,7 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                                        public String getKey(Tuple2<String, 
Double> value) { return value.f0; }
                                })
                                .reduceGroup(new 
RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
-                               public void reduce(Iterator<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
+                               public void reduce(Iterable<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
                        }).name("reducer")
                        .print().name("sink");
                        
@@ -314,7 +311,7 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                                        public String getKey(Tuple2<String, 
Double> value) { return value.f0; }
                                })
                                .reduceGroup(new 
RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
-                               public void reduce(Iterator<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
+                               public void reduce(Iterable<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
                        }).name("reducer");
                        
                        reduced.setCombinable(true);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 8fc4324..828a635 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler;
 
 import static org.junit.Assert.*;
@@ -24,8 +23,6 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.junit.Test;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration;
@@ -287,7 +284,7 @@ public class IterationsCompilerTest extends 
CompilerTestBase {
        public static final class Reduce101 extends 
RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
                
                @Override
-               public void reduce(Iterator<Tuple1<Long>> values, 
Collector<Tuple1<Long>> out) {}
+               public void reduce(Iterable<Tuple1<Long>> values, 
Collector<Tuple1<Long>> out) {}
        }
        
        @ConstantFields("0")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
index 64a4791..62d5d9b 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
@@ -16,17 +16,15 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration;
@@ -214,7 +212,7 @@ public class WorksetIterationsJavaApiCompilerTest extends 
CompilerTestBase {
                        iter.getWorkset().join(invariantInput)
                                .where(1, 2)
                                .equalTo(1, 2)
-                               .with(new 
RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, 
Tuple3<Long,Long,Long>>() {
+                               .with(new JoinFunction<Tuple3<Long,Long,Long>, 
Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
                                        public Tuple3<Long, Long, Long> 
join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
                                                return first;
                                        }
@@ -224,7 +222,7 @@ public class WorksetIterationsJavaApiCompilerTest extends 
CompilerTestBase {
                        result.join(iter.getSolutionSet())
                                .where(1, 0)
                                .equalTo(0, 2)
-                               .with(new RichJoinFunction<Tuple3<Long, Long, 
Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
+                               .with(new JoinFunction<Tuple3<Long, Long, 
Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
                                        public Tuple3<Long, Long, Long> 
join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
                                                return second;
                                        }
@@ -283,7 +281,7 @@ public class WorksetIterationsJavaApiCompilerTest extends 
CompilerTestBase {
                        
                DataSet<Tuple3<Long, Long, Long>> nextWorkset = 
joinedWithSolutionSet.groupBy(1, 2)
                        .reduceGroup(new 
RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
-                               public void reduce(Iterator<Tuple3<Long, Long, 
Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
+                               public void reduce(Iterable<Tuple3<Long, Long, 
Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
                        })
                        .name(NEXT_WORKSET_REDUCER_NAME)
                        .withConstantSet("1->1","2->2","0->0");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
index 42275af..e06e3ba 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.testfunctions;
 
-import java.util.Iterator;
 
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
@@ -32,9 +30,9 @@ public class IdentityGroupReducer<T> extends 
RichGroupReduceFunction<T, T> {
        private static final long serialVersionUID = 1L;
 
        @Override
-       public void reduce(Iterator<T> values, Collector<T> out) {
-               while (values.hasNext()) {
-                       out.collect(values.next());
+       public void reduce(Iterable<T> values, Collector<T> out) {
+               for (T next : values) {
+                       out.collect(next);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
index 3f24e65..b1a0e2d 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.testfunctions;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
@@ -32,7 +29,7 @@ public class Top1GroupReducer<T> extends 
RichGroupReduceFunction<T, T> {
        private static final long serialVersionUID = 1L;
 
        @Override
-       public void reduce(Iterator<T> values, Collector<T> out) {
-               out.collect(values.next());
+       public void reduce(Iterable<T> values, Collector<T> out) {
+               out.collect(values.iterator().next());
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java
index 6ef1651..13cd37b 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.util;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java
index 3f32423..b78a850 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.util;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
index cc8fe78..1a0abeb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
-import java.util.Iterator;
 
 import org.apache.flink.util.Collector;
 
@@ -41,11 +40,11 @@ import org.apache.flink.util.Collector;
  * Some keys may only be contained in one of the two original data sets. In 
that case, the CoGroup function is invoked
  * with in empty input for the side of the data set that did not contain 
elements with that specific key.
  * 
- * @param <V1> The data type of the first input data set.
- * @param <V2> The data type of the second input data set.
+ * @param <IN1> The data type of the first input data set.
+ * @param <IN2> The data type of the second input data set.
  * @param <O> The data type of the returned elements.
  */
-public interface CoGroupFunction<V1, V2, O> extends Function, Serializable {
+public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
        
        /**
         * This method must be implemented to provide a user implementation of a
@@ -59,5 +58,5 @@ public interface CoGroupFunction<V1, V2, O> extends Function, 
Serializable {
         * @throws Exception The function may throw Exceptions, which will 
cause the program to cancel,
         *                   and may trigger the recovery logic.
         */
-       void coGroup(Iterator<V1> first, Iterator<V2> second, Collector<O> out) 
throws Exception;
+       public void coGroup(Iterable<IN1> first, Iterable<IN2> second, 
Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
index aed7fae..0588526 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
-import java.util.Iterator;
 
 /**
  * Generic interface used for combine functions ("combiners"). Combiners act 
as auxiliaries to a {@link GroupReduceFunction}
@@ -45,5 +44,5 @@ public interface CombineFunction<T> extends Function, 
Serializable {
         * @throws Exception The function may throw Exceptions, which will 
cause the program to cancel,
         *                   and may trigger the recovery logic.
         */
-       T combine(Iterator<T> values) throws Exception;
+       T combine(Iterable<T> values) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
index edf1614..bbcbd0a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
-import java.util.Iterator;
 
 import org.apache.flink.util.Collector;
 
@@ -47,5 +46,5 @@ public interface FlatCombineFunction<T> extends Function, 
Serializable {
         * @throws Exception The function may throw Exceptions, which will 
cause the program to cancel,
         *                   and may trigger the recovery logic.
         */
-       void combine(Iterator<T> values, Collector<T> out) throws Exception;
+       void combine(Iterable<T> values, Collector<T> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
index 5fa959c..54cc5c7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
-import java.util.Iterator;
 
 import org.apache.flink.util.Collector;
 
@@ -53,5 +52,5 @@ public interface GroupReduceFunction<T, O> extends Function, 
Serializable {
         * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
         *                   to fail and may trigger recovery.
         */
-       void reduce(Iterator<T> values, Collector<O> out) throws Exception;
+       void reduce(Iterable<T> values, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java 
b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
new file mode 100644
index 0000000..a15c31c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+public class TraversableOnceException extends RuntimeException {
+
+       private static final long serialVersionUID = 7636881584773577290L;
+
+       public TraversableOnceException() {
+               super("The Iterable can be iterated over only once. Only the 
first call to 'iterator()' will succeed.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
index 2d794bd..3d68e99 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
@@ -35,7 +35,7 @@ import 
org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Edge;
 import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Triad;
 
 /**
- * Triangle enumeration is a preprocessing step to find closely connected 
parts in graphs.
+ * Triangle enumeration is a pre-processing step to find closely connected 
parts in graphs.
  * A triangle consists of three edges that connect three vertices with each 
other.
  * 
  * <p>
@@ -154,7 +154,9 @@ public class EnumTrianglesBasic {
                private final Triad outTriad = new Triad();
                
                @Override
-               public void reduce(Iterator<Edge> edges, Collector<Triad> out) 
throws Exception {
+               public void reduce(Iterable<Edge> edgesIter, Collector<Triad> 
out) throws Exception {
+                       
+                       final Iterator<Edge> edges = edgesIter.iterator();
                        
                        // clear vertex list
                        vertices.clear();
@@ -165,11 +167,11 @@ public class EnumTrianglesBasic {
                        vertices.add(firstEdge.getSecondVertex());
                        
                        // build and emit triads
-                       while(edges.hasNext()) {
+                       while (edges.hasNext()) {
                                Integer higherVertexId = 
edges.next().getSecondVertex();
                                
                                // combine vertex with all previously read 
vertices
-                               for(Integer lowerVertexId : vertices) {
+                               for (Integer lowerVertexId : vertices) {
                                        outTriad.setSecondVertex(lowerVertexId);
                                        outTriad.setThirdVertex(higherVertexId);
                                        out.collect(outTriad);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
index c0ea26a..efccb59 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
@@ -38,7 +38,7 @@ import 
org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.EdgeWithD
 import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Triad;
 
 /**
- * Triangle enumeration is a preprocessing step to find closely connected 
parts in graphs.
+ * Triangle enumeration is a pre-processing step to find closely connected 
parts in graphs.
  * A triangle consists of three edges that connect three vertices with each 
other.
  * 
  * <p>
@@ -166,8 +166,9 @@ public class EnumTrianglesOpt {
                final EdgeWithDegrees outputEdge = new EdgeWithDegrees();
                
                @Override
-               public void reduce(Iterator<Edge> edges, 
Collector<EdgeWithDegrees> out) throws Exception {
+               public void reduce(Iterable<Edge> edgesIter, 
Collector<EdgeWithDegrees> out) {
                        
+                       Iterator<Edge> edges = edgesIter.iterator();
                        otherVertices.clear();
                        
                        // get first edge
@@ -176,7 +177,7 @@ public class EnumTrianglesOpt {
                        this.otherVertices.add(edge.getSecondVertex());
                        
                        // get all other edges (assumes edges are sorted by 
second vertex)
-                       while(edges.hasNext()) {
+                       while (edges.hasNext()) {
                                edge = edges.next();
                                Integer otherVertex = edge.getSecondVertex();
                                // collect unique vertices
@@ -274,7 +275,8 @@ public class EnumTrianglesOpt {
                private final Triad outTriad = new Triad();
                
                @Override
-               public void reduce(Iterator<Edge> edges, Collector<Triad> out) 
throws Exception {
+               public void reduce(Iterable<Edge> edgesIter, Collector<Triad> 
out) throws Exception {
+                       final Iterator<Edge> edges = edgesIter.iterator();
                        
                        // clear vertex list
                        vertices.clear();
@@ -285,7 +287,7 @@ public class EnumTrianglesOpt {
                        vertices.add(firstEdge.getSecondVertex());
                        
                        // build and emit triads
-                       while(edges.hasNext()) {
+                       while (edges.hasNext()) {
                                Integer higherVertexId = 
edges.next().getSecondVertex();
                                
                                // combine vertex with all previously read 
vertices

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
index ba9754f..e6a9272 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
@@ -21,7 +21,6 @@ package org.apache.flink.example.java.graph;
 import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -163,12 +162,11 @@ public class PageRankBasic {
                private final ArrayList<Long> neighbors = new ArrayList<Long>();
                
                @Override
-               public void reduce(Iterator<Tuple2<Long, Long>> values, 
Collector<Tuple2<Long, Long[]>> out) {
+               public void reduce(Iterable<Tuple2<Long, Long>> values, 
Collector<Tuple2<Long, Long[]>> out) {
                        neighbors.clear();
                        Long id = 0L;
                        
-                       while (values.hasNext()) {
-                               Tuple2<Long, Long> n = values.next();
+                       for (Tuple2<Long, Long> n : values) {
                                id = n.f0;
                                neighbors.add(n.f1);
                        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
index 22054da..e9ba406 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
@@ -25,8 +25,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.IterativeDataSet;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.example.java.graph.util.ConnectedComponentsData;
@@ -70,8 +68,8 @@ public class TransitiveClosureNaive implements 
ProgramDescription {
                                .groupBy(0, 1)
                                .reduceGroup(new 
GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                                        @Override
-                                       public void 
reduce(Iterator<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) 
throws Exception {
-                                               out.collect(values.next());
+                                       public void 
reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) 
throws Exception {
+                                               
out.collect(values.iterator().next());
                                        }
                                });
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
index 3033c0d..9ca6ea9 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.example.java.relational;
 
-import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.FilterFunction;
@@ -249,12 +247,12 @@ public class WebLogAnalysis {
                 * 2: AVG_DURATION
                 */
                @Override
-               public void coGroup(Iterator<Tuple3<Integer, String, Integer>> 
ranks, Iterator<Tuple1<String>> visits, Collector<Tuple3<Integer, String, 
Integer>> out) {
+               public void coGroup(Iterable<Tuple3<Integer, String, Integer>> 
ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, 
Integer>> out) {
                        // Check if there is a entry in the visits relation
-                       if (!visits.hasNext()) {
-                               while (ranks.hasNext()) {
+                       if (!visits.iterator().hasNext()) {
+                               for (Tuple3<Integer, String, Integer> next : 
ranks) {
                                        // Emit all rank pairs
-                                       out.collect(ranks.next());
+                                       out.collect(next);
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index af0ea52..bcc523d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
+import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.api.java.io.CsvReader;
@@ -253,6 +254,26 @@ public abstract class ExecutionEnvironment {
        public CsvReader readCsvFile(String filePath) {
                return new CsvReader(filePath, this);
        }
+
+       // ------------------------------------ File Input Format 
-----------------------------------------
+       
+       public <X> DataSource<X> readFile(FileInputFormat<X> inputFormat, 
String filePath) {
+               if (inputFormat == null) {
+                       throw new IllegalArgumentException("InputFormat must 
not be null.");
+               }
+               if (filePath == null) {
+                       throw new IllegalArgumentException("The file path must 
not be null.");
+               }
+               
+               inputFormat.setFilePath(new Path(filePath));
+               try {
+                       return createInput(inputFormat, 
TypeExtractor.getInputFormatTypes(inputFormat));
+               }
+               catch (Exception e) {
+                       throw new InvalidProgramException("The type returned by 
the input format could not be automatically determined. " +
+                                       "Please specify the TypeInformation of 
the produced type explicitly.");
+               }
+       }
        
        // ----------------------------------- Generic Input Format 
---------------------------------------
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
index b363606..087808d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
@@ -28,13 +28,13 @@ public abstract class GroupReduceIterator<IN, OUT> extends 
RichGroupReduceFuncti
        private static final long serialVersionUID = 1L;
 
 
-       public abstract Iterator<OUT> reduceGroup(Iterator<IN> values) throws 
Exception;
+       public abstract Iterator<OUT> reduceGroup(Iterable<IN> values) throws 
Exception;
        
        
        // 
-------------------------------------------------------------------------------------------
        
        @Override
-       public final void reduce(Iterator<IN> values, Collector<OUT> out) 
throws Exception {
+       public final void reduce(Iterable<IN> values, Collector<OUT> out) 
throws Exception {
                for (Iterator<OUT> iter = reduceGroup(values); iter.hasNext(); 
) {
                        out.collect(iter.next());
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
index e78c31e..3169622 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.java.functions;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RichFunction;
@@ -40,6 +38,6 @@ public abstract class RichCoGroupFunction<IN1, IN2, OUT> 
extends AbstractRichFun
        private static final long serialVersionUID = 1L;
        
        @Override
-       public abstract void coGroup(Iterator<IN1> first, Iterator<IN2> second, 
Collector<OUT> out) throws Exception;
+       public abstract void coGroup(Iterable<IN1> first, Iterable<IN2> second, 
Collector<OUT> out) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
index 97c15ff..a5d45aa 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
@@ -24,8 +24,6 @@ import 
org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.util.Collector;
 
-import java.util.Iterator;
-
 /**
  * Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, 
it gives access to the
  * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
@@ -39,5 +37,5 @@ public abstract class RichFlatCombineFunction<T> extends 
AbstractRichFunction im
        private static final long serialVersionUID = 1L;
 
        @Override
-       public abstract void combine(Iterator<T> values, Collector<T> out) 
throws Exception;
+       public abstract void combine(Iterable<T> values, Collector<T> out) 
throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
index 801f592..9198aeb 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
@@ -22,7 +22,6 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
-import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
@@ -44,7 +43,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> 
extends AbstractRichFunct
        private static final long serialVersionUID = 1L;
 
        @Override
-       public abstract void reduce(Iterator<IN> values, Collector<OUT> out) 
throws Exception;
+       public abstract void reduce(Iterable<IN> values, Collector<OUT> out) 
throws Exception;
        
        /**
         * The combine methods pre-reduces elements. It may be called on 
subsets of the data
@@ -59,7 +58,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> 
extends AbstractRichFunct
         * <p>
         * Since the reduce function will be called on the result of this 
method, it is important that this
         * method returns the same data type as it consumes. By default, this 
method only calls the
-        * {@link #reduce(Iterator, Collector)} method. If the behavior in the 
pre-reducing is different
+        * {@link #reduce(Iterable, Collector)} method. If the behavior in the 
pre-reducing is different
         * from the final reduce function (for example because the reduce 
function changes the data type),
         * this method must be overwritten, or the execution will fail.
         * 
@@ -70,7 +69,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> 
extends AbstractRichFunct
         *                   to fail and may trigger recovery.
         */
        @Override
-       public void combine(Iterator<IN> values, Collector<IN> out) throws 
Exception {
+       public void combine(Iterable<IN> values, Collector<IN> out) throws 
Exception {
                @SuppressWarnings("unchecked")
                Collector<OUT> c = (Collector<OUT>) out;
                reduce(values, c);
@@ -80,7 +79,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> 
extends AbstractRichFunct
        
        /**
         * This annotation can be added to classes that extend {@link 
RichGroupReduceFunction}, in oder to mark
-        * them as "combinable". The system may call the {@link 
RichGroupReduceFunction#combine(Iterator, Collector)}
+        * them as "combinable". The system may call the {@link 
RichGroupReduceFunction#combine(Iterable, Collector)}
         * method on such functions, to pre-reduce the data before transferring 
it over the network to
         * the actual group reduce operation.
         * <p>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index 80a5fa0..1ceb8c8 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -271,13 +271,14 @@ public class AggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Aggregate
                }
                
                @Override
-               public void reduce(Iterator<T> values, Collector<T> out) {
+               public void reduce(Iterable<T> records, Collector<T> out) {
                        final AggregationFunction<Object>[] aggFunctions = 
this.aggFunctions;
                        final int[] fieldPositions = this.fieldPositions;
 
                        // aggregators are initialized from before
                        
                        T current = null;
+                       final Iterator<T> values = records.iterator();
                        while (values.hasNext()) {
                                current = values.next();
                                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 744893b..a890b32 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -301,7 +301,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
        /**
         * Intermediate step of a CoGroup transformation. <br/>
         * To continue the CoGroup transformation, select the grouping key of 
the first input {@link DataSet} by calling 
-        * {@link CoGroupOperatorSets#where()} or {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(KeySelector)}.
+        * {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(int,
 int...)} or {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(KeySelector)}.
         *
         * @param <I1> The type of the first input DataSet of the CoGroup 
transformation.
         * @param <I2> The type of the second input DataSet of the CoGroup 
transformation.
@@ -328,7 +328,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                 * @param field0 The first index of the Tuple fields of the 
first co-grouped DataSets that should be used as key
                 * @param fields The indexes of the Tuple fields of the first 
co-grouped DataSets that should be used as keys.
                 * @return An incomplete CoGroup transformation. 
-                *           Call {@link 
CoGroupOperatorSetsPredicate#equalTo()} to continue the CoGroup.
+                *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int,
 int...)} to continue the CoGroup.
                 * 
                 * @see Tuple
                 * @see DataSet
@@ -348,7 +348,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                 * @param field0 The first field of the Tuple fields of the 
first co-grouped DataSets that should be used as key
                 * @param fields The  fields of the first co-grouped DataSets 
that should be used as keys.
                 * @return An incomplete CoGroup transformation.
-                *           Call {@link 
CoGroupOperatorSetsPredicate#equalTo()} to continue the CoGroup.
+                *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int,
 int...)} to continue the CoGroup.
                 *
                 * @see Tuple
                 * @see DataSet
@@ -367,7 +367,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                 * 
                 * @param keyExtractor The KeySelector function which extracts 
the key values from the DataSet on which it is grouped.
                 * @return An incomplete CoGroup transformation. 
-                *           Call {@link 
CoGroupOperatorSetsPredicate#equalTo()} to continue the CoGroup. 
+                *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int,
 int...)} to continue the CoGroup. 
                 * 
                 * @see KeySelector
                 * @see DataSet
@@ -381,7 +381,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                /**
                 * Intermediate step of a CoGroup transformation. <br/>
                 * To continue the CoGroup transformation, select the grouping 
key of the second input {@link DataSet} by calling 
-                * {@link CoGroupOperatorSetsPredicate#equalTo(int...)} or 
{@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(KeySelector)}.
+                * {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int,
 int...)} or {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(KeySelector)}.
                 *
                 */
                public final class CoGroupOperatorSetsPredicate {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 7646fa0..fd35773 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.java.operators;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -148,8 +146,8 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void reduce(Iterator<T> values, Collector<T> out) {
-                       out.collect(values.next());
+               public void reduce(Iterable<T> values, Collector<T> out) {
+                       out.collect(values.iterator().next());
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index e1424ad..7b0ad4d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -37,7 +37,6 @@ import org.apache.flink.types.TypeInformation;
 
 import org.apache.flink.api.java.DataSet;
 
-
 /**
  * This operator represents the application of a "reduceGroup" function on a 
data set, and the
  * result data set produced by the function.
@@ -185,7 +184,6 @@ public class GroupReduceOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT
                else {
                        throw new UnsupportedOperationException("Unrecognized 
key type.");
                }
-               
        }
        
        
@@ -215,7 +213,4 @@ public class GroupReduceOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT
                
                return reducer;
        }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
index 89290f0..36bae6b 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
@@ -78,47 +76,11 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>
 
 
                @Override
-               public void coGroup(Iterator<Tuple2<K, I1>> records1, 
Iterator<Tuple2<K, I2>> records2, Collector<OUT> out) throws Exception {
-                       iter1.set(records1);
-                       iter2.set(records2);
+               public void coGroup(Iterable<Tuple2<K, I1>> records1, 
Iterable<Tuple2<K, I2>> records2, Collector<OUT> out) throws Exception {
+                       iter1.set(records1.iterator());
+                       iter2.set(records2.iterator());
                        this.wrappedFunction.coGroup(iter1, iter2, out);
                }
                
        }
-       
-       public static class UnwrappingKeyIterator<K, I1> implements 
Iterator<I1> {
-
-               private Iterator<Tuple2<K, I1>> outerIterator;
-               I1 firstValue;
-               
-               public UnwrappingKeyIterator(Iterator<Tuple2<K, I1>> records1) {
-                       this.outerIterator = records1;
-                       this.firstValue = null;
-               }
-               
-               public UnwrappingKeyIterator(Iterator<Tuple2<K, I1>> records1, 
I1 firstValue ) {
-                       this.outerIterator = records1;
-                       this.firstValue = firstValue;
-               }
-               
-               @Override
-               public boolean hasNext() {
-                       return firstValue != null || outerIterator.hasNext();
-               }
-
-               @Override
-               public I1 next() {
-                       if(firstValue != null) {
-                               firstValue = null;
-                               return firstValue;
-                       }
-                       return outerIterator.next().getField(1);
-               }
-
-               @Override
-               public void remove() {
-                       throw new UnsupportedOperationException();
-               }
-               
-       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
index 73ea004..efd52d5 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
@@ -68,18 +68,10 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K>
                        super(wrapped);
                }
 
-               //@SuppressWarnings("unchecked")
-               //@Override
-               //public OUT join(Tuple2<K, I1> value1, Tuple2<K, I2> value2) 
throws Exception {
-               //      return wrappedFunction.join((I1)(value1.getField(1)), 
(I2)(value2.getField(1)));
-               //}
-
                @SuppressWarnings("unchecked")
                @Override
                public void join (Tuple2<K, I1> value1, Tuple2<K, I2> value2, 
Collector<OUT> collector) throws Exception {
                        wrappedFunction.join ((I1)(value1.getField(1)), 
(I2)(value2.getField(1)), collector);
                }
-               
        }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 29eb5ed..7e3f0e5 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -65,14 +63,14 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> 
extends GroupReduceOp
 
 
                @Override
-               public void reduce(Iterator<Tuple2<K, IN>> values, 
Collector<OUT> out) throws Exception {
-                       iter.set(values);
+               public void reduce(Iterable<Tuple2<K, IN>> values, 
Collector<OUT> out) throws Exception {
+                       iter.set(values.iterator());
                        this.wrappedFunction.reduce(iter, out);
                }
 
                @Override
-               public void combine(Iterator<Tuple2<K, IN>> values, 
Collector<Tuple2<K, IN>> out) throws Exception {
-                               iter.set(values);
+               public void combine(Iterable<Tuple2<K, IN>> values, 
Collector<Tuple2<K, IN>> out) throws Exception {
+                               iter.set(values.iterator());
                                coll.set(out);
                                this.wrappedFunction.combine(iter, coll);
                }
@@ -98,8 +96,8 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> 
extends GroupReduceOp
        
        
                @Override
-               public void reduce(Iterator<Tuple2<K, IN>> values, 
Collector<OUT> out) throws Exception {
-                       iter.set(values);
+               public void reduce(Iterable<Tuple2<K, IN>> values, 
Collector<OUT> out) throws Exception {
+                       iter.set(values.iterator());
                        this.wrappedFunction.reduce(iter, out);
                }
                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
index 1f2c208..c09f3a8 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
@@ -21,20 +21,23 @@ package org.apache.flink.api.java.operators.translation;
 import java.util.Iterator;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TraversableOnceException;
 
 /**
  * An iterator that reads 2-tuples (key value pairs) and returns only the 
values (second field).
  * The iterator also tracks the keys, as the pairs flow though it.
  */
-public class TupleUnwrappingIterator<T, K> implements Iterator<T>, 
java.io.Serializable {
+public class TupleUnwrappingIterator<T, K> implements Iterator<T>, 
Iterable<T>, java.io.Serializable {
 
        private static final long serialVersionUID = 1L;
        
        private K lastKey; 
        private Iterator<Tuple2<K, T>> iterator;
+       private boolean iteratorAvailable;
        
        public void set(Iterator<Tuple2<K, T>> iterator) {
                this.iterator = iterator;
+               this.iteratorAvailable = true;
        }
        
        public K getLastKey() {
@@ -57,4 +60,14 @@ public class TupleUnwrappingIterator<T, K> implements 
Iterator<T>, java.io.Seria
        public void remove() {
                throw new UnsupportedOperationException();
        }
+
+       @Override
+       public Iterator<T> iterator() {
+               if (iteratorAvailable) {
+                       iteratorAvailable = false;
+                       return this;
+               } else {
+                       throw new TraversableOnceException();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index 267d879..748e3f3 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -42,7 +42,7 @@ public abstract class WrappingFunction<T extends Function> 
extends AbstractRichF
        
        private static final long serialVersionUID = 1L;
 
-       protected final T wrappedFunction;
+       protected T wrappedFunction;
 
        protected WrappingFunction(T wrappedFunction) {
                this.wrappedFunction = wrappedFunction;
@@ -174,6 +174,5 @@ public abstract class WrappingFunction<T extends Function> 
extends AbstractRichF
                public <T extends Value> T getPreviousIterationAggregate(String 
name) {
                        return ((IterationRuntimeContext) 
context).<T>getPreviousIterationAggregate(name);
                }
-               
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
index e9b5c25..fc7ef49 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
@@ -28,7 +28,7 @@ import org.apache.flink.util.Collector;
 /**
  * The CoGroupFunction is the base class for functions that are invoked by a 
{@link org.apache.flink.api.java.record.operators.CoGroupOperator}.
  */
-public abstract class CoGroupFunction extends AbstractRichFunction implements 
org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> {
+public abstract class CoGroupFunction extends AbstractRichFunction {
 
        private static final long serialVersionUID = 1L;
 
@@ -45,7 +45,5 @@ public abstract class CoGroupFunction extends 
AbstractRichFunction implements or
         *                   runtime catches an exception, it aborts the task 
and lets the fail-over logic
         *                   decide whether to retry the task execution.
         */
-       @Override
        public abstract void coGroup(Iterator<Record> records1, 
Iterator<Record> records2, Collector<Record> out) throws Exception;
-       
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
index a1e6369..9848499 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
@@ -16,14 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.record.functions;
 
 import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
@@ -31,7 +28,7 @@ import org.apache.flink.util.Collector;
  * The ReduceFunction must be extended to provide a reducer implementation, as 
invoked by a
  * {@link org.apache.flink.api.java.record.operators.ReduceOperator}.
  */
-public abstract class ReduceFunction extends AbstractRichFunction implements 
GroupReduceFunction<Record, Record>, FlatCombineFunction<Record> {
+public abstract class ReduceFunction extends AbstractRichFunction {
        
        private static final long serialVersionUID = 1L;
        
@@ -47,7 +44,6 @@ public abstract class ReduceFunction extends 
AbstractRichFunction implements Gro
         *                   runtime catches an exception, it aborts the reduce 
task and lets the fail-over logic
         *                   decide whether to retry the reduce execution.
         */
-       @Override
        public abstract void reduce(Iterator<Record> records, Collector<Record> 
out) throws Exception;
 
        /**
@@ -71,7 +67,6 @@ public abstract class ReduceFunction extends 
AbstractRichFunction implements Gro
         *                   runtime catches an exception, it aborts the 
combine task and lets the fail-over logic
         *                   decide whether to retry the combiner execution.
         */
-       @Override
        public void combine(Iterator<Record> records, Collector<Record> out) 
throws Exception {
                // to be implemented, if the reducer should use a combiner. 
Note that the combining method
                // is only used, if the stub class is further annotated with 
the annotation

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
index a8cedeb..c958b84 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
@@ -16,9 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.record.operators;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -29,13 +31,15 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.RecordOperator;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.api.java.record.functions.CoGroupFunction;
 import org.apache.flink.api.java.record.functions.FunctionAnnotation;
 import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
 
 /**
  * CoGroupOperator that applies a {@link CoGroupFunction} to groups of records 
sharing
@@ -43,7 +47,7 @@ import org.apache.flink.types.Record;
  * 
  * @see CoGroupFunction
  */
-public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, 
Record, CoGroupFunction> implements RecordOperator {
+public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, 
Record, org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, 
Record>> implements RecordOperator {
        
        /**
         * The types of the keys that the operator groups on.
@@ -61,7 +65,8 @@ public class CoGroupOperator extends 
CoGroupOperatorBase<Record, Record, Record,
         * @param keyColumn2 The position of the key in the second input's 
records.
         */
        public static Builder builder(CoGroupFunction udf, Class<? extends 
Key<?>> keyClass, int keyColumn1, int keyColumn2) {
-               return new Builder(new 
UserCodeObjectWrapper<CoGroupFunction>(udf), keyClass, keyColumn1, keyColumn2);
+               WrappingCoGroupFunction wrapper = new 
WrappingCoGroupFunction(udf);
+               return new Builder(new 
UserCodeObjectWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record,
 Record, Record>>(wrapper), keyClass, keyColumn1, keyColumn2);
        }
        
        /**
@@ -75,7 +80,8 @@ public class CoGroupOperator extends 
CoGroupOperatorBase<Record, Record, Record,
        public static Builder builder(Class<? extends CoGroupFunction> udf, 
Class<? extends Key<?>> keyClass,
                        int keyColumn1, int keyColumn2)
        {
-               return new Builder(new 
UserCodeClassWrapper<CoGroupFunction>(udf), keyClass, keyColumn1, keyColumn2);
+               WrappingCoGroupFunction wrapper = new 
WrappingClassCoGroupFunction(udf);
+               return new Builder(new 
UserCodeObjectWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record,
 Record, Record>>(wrapper), keyClass, keyColumn1, keyColumn2);
        }
        
        /**
@@ -96,7 +102,9 @@ public class CoGroupOperator extends 
CoGroupOperatorBase<Record, Record, Record,
                setBroadcastVariables(builder.broadcastInputs);
                setGroupOrderForInputOne(builder.secondaryOrder1);
                setGroupOrderForInputTwo(builder.secondaryOrder2);
-               
setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(builder.udf));
+               
+               CoGroupFunction function = ((WrappingCoGroupFunction) 
builder.udf.getUserCodeObject()).getWrappedFunction();
+               
setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(new 
UserCodeObjectWrapper<CoGroupFunction>(function)));
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -115,7 +123,7 @@ public class CoGroupOperator extends 
CoGroupOperatorBase<Record, Record, Record,
        public static class Builder {
                
                /* The required parameters */
-               private final UserCodeWrapper<CoGroupFunction> udf;
+               private final 
UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, 
Record, Record>> udf;
                private final List<Class<? extends Key<?>>> keyClasses;
                private final List<Integer> keyColumns1;
                private final List<Integer> keyColumns2;
@@ -136,7 +144,7 @@ public class CoGroupOperator extends 
CoGroupOperatorBase<Record, Record, Record,
                 * @param keyColumn1 The position of the key in the first 
input's records.
                 * @param keyColumn2 The position of the key in the second 
input's records.
                 */
-               protected Builder(UserCodeWrapper<CoGroupFunction> udf, Class<? 
extends Key<?>> keyClass,
+               protected 
Builder(UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record,
 Record, Record>> udf, Class<? extends Key<?>> keyClass,
                                int keyColumn1, int keyColumn2)
                {
                        this.udf = udf;
@@ -157,7 +165,7 @@ public class CoGroupOperator extends 
CoGroupOperatorBase<Record, Record, Record,
                 * 
                 * @param udf The {@link CoGroupFunction} implementation for 
this CoGroup operator.
                 */
-               protected Builder(UserCodeWrapper<CoGroupFunction> udf) {
+               protected 
Builder(UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record,
 Record, Record>> udf) {
                        this.udf = udf;
                        this.keyClasses = new ArrayList<Class<? extends 
Key<?>>>();
                        this.keyColumns1 = new ArrayList<Integer>();
@@ -338,4 +346,39 @@ public class CoGroupOperator extends 
CoGroupOperatorBase<Record, Record, Record,
                        return new CoGroupOperator(this);
                }
        }
+       
+       // 
============================================================================================
+       
+       public static class WrappingCoGroupFunction extends 
WrappingFunction<CoGroupFunction> 
+                       implements 
org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> {
+               
+               private static final long serialVersionUID = 1L;
+               
+               public WrappingCoGroupFunction(CoGroupFunction coGrouper) {
+                       super(coGrouper);
+               }
+               
+               @Override
+               public void coGroup(Iterable<Record> records1, Iterable<Record> 
records2, Collector<Record> out) throws Exception {
+                       this.wrappedFunction.coGroup(records1.iterator(), 
records2.iterator(), out);
+               }
+       }
+       
+       public static final class WrappingClassCoGroupFunction extends 
WrappingCoGroupFunction {
+               
+               private static final long serialVersionUID = 1L;
+               
+               public WrappingClassCoGroupFunction(Class<? extends 
CoGroupFunction> reducer) {
+                       super(InstantiationUtil.instantiate(reducer));
+               }
+               
+               private void writeObject(ObjectOutputStream out) throws 
IOException {
+                       out.writeObject(wrappedFunction.getClass());
+               }
+
+               private void readObject(ObjectInputStream in) throws 
IOException, ClassNotFoundException {
+                       Class<?> clazz = (Class<?>) in.readObject();
+                       this.wrappedFunction = (CoGroupFunction) 
InstantiationUtil.instantiate(clazz);
+               }
+       }
 }

Reply via email to