http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
index a325e33..a29d4e0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.drivers;
 
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -152,11 +150,10 @@ public class GroupReduceDriverTest {
        public static final class ConcatSumReducer extends 
RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
 
                @Override
-               public void reduce(Iterator<Tuple2<String, Integer>> values, 
Collector<Tuple2<String, Integer>> out) {
-                       Tuple2<String, Integer> current = values.next();
+               public void reduce(Iterable<Tuple2<String, Integer>> values, 
Collector<Tuple2<String, Integer>> out) {
+                       Tuple2<String, Integer> current = new Tuple2<String, 
Integer>("", 0);
                        
-                       while (values.hasNext()) {
-                               Tuple2<String, Integer> next = values.next();
+                       for (Tuple2<String, Integer> next : values) {
                                next.f0 = current.f0 + next.f0;
                                next.f1 = current.f1 + next.f1;
                                current = next;
@@ -169,11 +166,10 @@ public class GroupReduceDriverTest {
        public static final class ConcatSumMutableReducer extends 
RichGroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, 
IntValue>> {
 
                @Override
-               public void reduce(Iterator<Tuple2<StringValue, IntValue>> 
values, Collector<Tuple2<StringValue, IntValue>> out) {
-                       Tuple2<StringValue, IntValue> current = values.next();
+               public void reduce(Iterable<Tuple2<StringValue, IntValue>> 
values, Collector<Tuple2<StringValue, IntValue>> out) {
+                       Tuple2<StringValue, IntValue> current = new 
Tuple2<StringValue, IntValue>(new StringValue(""), new IntValue(0));
                        
-                       while (values.hasNext()) {
-                               Tuple2<StringValue, IntValue> next = 
values.next();
+                       for (Tuple2<StringValue, IntValue> next : values) {
                                next.f0.append(current.f0);
                                next.f1.setValue(current.f1.getValue() + 
next.f1.getValue());
                                current = next;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index 7020f89..4b3c197 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.IOException;
@@ -26,13 +25,12 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import 
org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
 import org.apache.flink.configuration.Configuration;
@@ -273,7 +271,7 @@ public class CombiningUnilateralSortMergerITCase {
 
        // 
--------------------------------------------------------------------------------------------
        
-       public static class TestCountCombiner extends ReduceFunction {
+       public static class TestCountCombiner extends 
RichGroupReduceFunction<Record, Record> {
                private static final long serialVersionUID = 1L;
                
                private final IntValue count = new IntValue();
@@ -284,11 +282,11 @@ public class CombiningUnilateralSortMergerITCase {
                
                
                @Override
-               public void combine(Iterator<Record> values, Collector<Record> 
out) {
+               public void combine(Iterable<Record> values, Collector<Record> 
out) {
                        Record rec = null;
                        int cnt = 0;
-                       while (values.hasNext()) {
-                               rec = values.next();
+                       for (Record next : values) {
+                               rec = next;
                                cnt += rec.getField(1, 
IntValue.class).getValue();
                        }
                        
@@ -298,7 +296,7 @@ public class CombiningUnilateralSortMergerITCase {
                }
 
                @Override
-               public void reduce(Iterator<Record> values, Collector<Record> 
out) {}
+               public void reduce(Iterable<Record> values, Collector<Record> 
out) {}
                
                @Override
                public void open(Configuration parameters) throws Exception {
@@ -311,7 +309,7 @@ public class CombiningUnilateralSortMergerITCase {
                }
        }
 
-       public static class TestCountCombiner2 extends ReduceFunction {
+       public static class TestCountCombiner2 extends 
RichGroupReduceFunction<Record, Record> {
                private static final long serialVersionUID = 1L;
                
                public volatile boolean opened = false;
@@ -319,11 +317,11 @@ public class CombiningUnilateralSortMergerITCase {
                public volatile boolean closed = false;
                
                @Override
-               public void combine(Iterator<Record> values, Collector<Record> 
out) {
+               public void combine(Iterable<Record> values, Collector<Record> 
out) {
                        Record rec = null;
                        int cnt = 0;
-                       while (values.hasNext()) {
-                               rec = values.next();
+                       for (Record next : values) {
+                               rec = next;
                                cnt += Integer.parseInt(rec.getField(1, 
TestData.Value.class).toString());
                        }
 
@@ -331,7 +329,7 @@ public class CombiningUnilateralSortMergerITCase {
                }
 
                @Override
-               public void reduce(Iterator<Record> values, Collector<Record> 
out) {
+               public void reduce(Iterable<Record> values, Collector<Record> 
out) {
                        // yo, nothing, mon
                }
                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
index a66fb5d..48a4b91 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
@@ -116,8 +116,8 @@ public class SortMergeCoGroupIteratorITCase
                        final TestData.Key key = new TestData.Key();
                        while (iterator.next())
                        {
-                               Iterator<Record> iter1 = iterator.getValues1();
-                               Iterator<Record> iter2 = iterator.getValues2();
+                               Iterator<Record> iter1 = 
iterator.getValues1().iterator();
+                               Iterator<Record> iter2 = 
iterator.getValues2().iterator();
                                
                                TestData.Value v1 = null;
                                TestData.Value v2 = null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
index e9010d2..27fa540 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,17 +39,15 @@ import org.junit.Test;
 /**
  * Test for the key grouped iterator, which advances in windows containing the 
same key and provides a sub-iterator
  * over the records with the same key.
- * 
  */
-public class KeyGroupedIteratorTest
-{
+public class KeyGroupedIteratorTest {
+       
        private MutableObjectIterator<Record> sourceIter;               // the 
iterator that provides the input
        
        private KeyGroupedIterator<Record> psi;                                 
        // the grouping iterator, progressing in key steps
        
        @Before
-       public void setup()
-       {
+       public void setup() {
                final ArrayList<IntStringPair> source = new 
ArrayList<IntStringPair>();
                
                // add elements to the source
@@ -91,8 +90,7 @@ public class KeyGroupedIteratorTest
        }
 
        @Test
-       public void testNextKeyOnly() throws Exception
-       {
+       public void testNextKeyOnly() throws Exception {
                try {
                        Assert.assertTrue("KeyGroupedIterator must have another 
key.", this.psi.nextKey());
                        Assert.assertTrue("KeyGroupedIterator returned a wrong 
key.", this.psi.getComparatorWithCurrentReference().equalToReference(new 
Record(new IntValue(1))));
@@ -131,6 +129,8 @@ public class KeyGroupedIteratorTest
                try {
                        // Key 1, Value A
                        Assert.assertTrue("KeyGroupedIterator must have another 
key.", this.psi.nextKey());
+                       Assert.assertTrue(hasIterator(this.psi.getValues()));
+                       Assert.assertFalse(hasIterator(this.psi.getValues()));
                        Assert.assertTrue("KeyGroupedIterator must have another 
value.", this.psi.getValues().hasNext());
                        Assert.assertTrue("KeyGroupedIterator returned a wrong 
key.", this.psi.getComparatorWithCurrentReference().equalToReference(new 
Record(new IntValue(1))));
                        Assert.assertEquals("KeyGroupedIterator returned a 
wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -139,6 +139,8 @@ public class KeyGroupedIteratorTest
                        
                        // Key 2, Value B
                        Assert.assertTrue("KeyGroupedIterator must have another 
key.", this.psi.nextKey());
+                       Assert.assertTrue(hasIterator(this.psi.getValues()));
+                       Assert.assertFalse(hasIterator(this.psi.getValues()));
                        Assert.assertTrue("KeyGroupedIterator must have another 
value.", this.psi.getValues().hasNext());
                        Assert.assertTrue("KeyGroupedIterator returned a wrong 
key.", this.psi.getComparatorWithCurrentReference().equalToReference(new 
Record(new IntValue(2))));
                        Assert.assertEquals("KeyGroupedIterator returned a 
wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -147,6 +149,8 @@ public class KeyGroupedIteratorTest
                        
                        // Key 3, Values C, D
                        Assert.assertTrue("KeyGroupedIterator must have another 
key.", this.psi.nextKey());
+                       Assert.assertTrue(hasIterator(this.psi.getValues()));
+                       Assert.assertFalse(hasIterator(this.psi.getValues()));
                        Assert.assertTrue("KeyGroupedIterator must have another 
value.", this.psi.getValues().hasNext());
                        Assert.assertTrue("KeyGroupedIterator returned a wrong 
key.", this.psi.getComparatorWithCurrentReference().equalToReference(new 
Record(new IntValue(3))));
                        Assert.assertEquals("KeyGroupedIterator returned a 
wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -173,6 +177,8 @@ public class KeyGroupedIteratorTest
                        
                        // Key 4, Values E, F, G
                        Assert.assertTrue("KeyGroupedIterator must have another 
key.", this.psi.nextKey());
+                       Assert.assertTrue(hasIterator(this.psi.getValues()));
+                       Assert.assertFalse(hasIterator(this.psi.getValues()));
                        Assert.assertTrue("KeyGroupedIterator must have another 
value.", this.psi.getValues().hasNext());
                        Assert.assertTrue("KeyGroupedIterator returned a wrong 
key.", this.psi.getComparatorWithCurrentReference().equalToReference(new 
Record(new IntValue(4))));
                        Assert.assertEquals("KeyGroupedIterator returned a 
wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -193,6 +199,8 @@ public class KeyGroupedIteratorTest
                        
                        // Key 5, Values H, I, J, K, L
                        Assert.assertTrue("KeyGroupedIterator must have another 
key.", this.psi.nextKey());
+                       Assert.assertTrue(hasIterator(this.psi.getValues()));
+                       Assert.assertFalse(hasIterator(this.psi.getValues()));
                        Assert.assertTrue("KeyGroupedIterator must have another 
value.", this.psi.getValues().hasNext());
                        Assert.assertTrue("KeyGroupedIterator returned a wrong 
key.", this.psi.getComparatorWithCurrentReference().equalToReference(new 
Record(new IntValue(5))));
                        Assert.assertEquals("KeyGroupedIterator returned a 
wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -231,6 +239,7 @@ public class KeyGroupedIteratorTest
                        
                        Assert.assertFalse("KeyGroupedIterator must not have 
another key.", this.psi.nextKey());
                        Assert.assertFalse("KeyGroupedIterator must not have 
another key.", this.psi.nextKey());
+                       Assert.assertNull(this.psi.getValues());
                } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail("The test encountered an unexpected 
exception.");
@@ -244,12 +253,18 @@ public class KeyGroupedIteratorTest
                        // Progression only via nextKey() and hasNext() - Key 
1, Value A
                        Assert.assertTrue("KeyGroupedIterator must have another 
key.", this.psi.nextKey());
                        Assert.assertTrue("KeyGroupedIterator must have another 
value.", this.psi.getValues().hasNext());
+                       Assert.assertTrue(hasIterator(this.psi.getValues()));
+                       Assert.assertFalse(hasIterator(this.psi.getValues()));
                        
                        // Progression only through nextKey() - Key 2, Value B
                        Assert.assertTrue("KeyGroupedIterator must have another 
key.", this.psi.nextKey());
+                       Assert.assertTrue(hasIterator(this.psi.getValues()));
+                       Assert.assertFalse(hasIterator(this.psi.getValues()));
                        
                        // Progression first though haNext() and next(), then 
through hasNext() - Key 3, Values C, D
                        Assert.assertTrue("KeyGroupedIterator must have another 
key.", this.psi.nextKey());
+                       Assert.assertTrue(hasIterator(this.psi.getValues()));
+                       Assert.assertFalse(hasIterator(this.psi.getValues()));
                        Assert.assertTrue("KeyGroupedIterator must have another 
value.", this.psi.getValues().hasNext());
                        Assert.assertTrue("KeyGroupedIterator returned a wrong 
key.", this.psi.getComparatorWithCurrentReference().equalToReference(new 
Record(new IntValue(3))));
                        Assert.assertEquals("KeyGroupedIterator returned a 
wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -260,6 +275,8 @@ public class KeyGroupedIteratorTest
                        
                        // Progression first via next() only, then hasNext() 
only Key 4, Values E, F, G
                        Assert.assertTrue("KeyGroupedIterator must have another 
key.", this.psi.nextKey());
+                       Assert.assertTrue(hasIterator(this.psi.getValues()));
+                       Assert.assertFalse(hasIterator(this.psi.getValues()));
                        Assert.assertEquals("KeyGroupedIterator returned a 
wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, 
StringValue.class));
                        Assert.assertTrue("KeyGroupedIterator must have another 
value.", this.psi.getValues().hasNext());
                        
@@ -270,6 +287,8 @@ public class KeyGroupedIteratorTest
                        Assert.assertTrue("KeyGroupedIterator returned a wrong 
key.", this.psi.getComparatorWithCurrentReference().equalToReference(new 
Record(new IntValue(5))));
                        Assert.assertEquals("KeyGroupedIterator returned a 
wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
                        Assert.assertEquals("KeyGroupedIterator returned a 
wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, 
StringValue.class));
+                       Assert.assertTrue(hasIterator(this.psi.getValues()));
+                       Assert.assertFalse(hasIterator(this.psi.getValues()));
                        Assert.assertTrue("KeyGroupedIterator must have another 
value.", this.psi.getValues().hasNext());
                        
                        // end
@@ -355,4 +374,14 @@ public class KeyGroupedIteratorTest
                        return string;
                }
        }
+       
+       public boolean hasIterator(Iterable<?> iterable) {
+               try {
+                       iterable.iterator();
+                       return true;
+               }
+               catch (TraversableOnceException e) {
+                       return false;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala
index 6ea8cbf..de7b7d1 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala
@@ -19,10 +19,10 @@
 
 package org.apache.flink.api.scala.functions
 
-import java.util.{ Iterator => JIterator }
-
 import scala.Iterator
 
+import java.util.{Iterator => JIterator}
+
 import org.apache.flink.api.scala.analysis.{UDTSerializer, FieldSelector, UDT}
 import org.apache.flink.api.scala.analysis.UDF1
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala
index 4c26307..6a71bb7 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.scala.operators
 
 import language.experimental.macros
@@ -54,7 +53,7 @@ class CoGroupDataSetWithWhereAndEqual[LeftIn, RightIn](val 
leftKeySelection: Lis
   def flatMap[Out](fun: (Iterator[LeftIn], Iterator[RightIn]) => 
Iterator[Out]): DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out] = 
macro CoGroupMacros.flatMap[LeftIn, RightIn, Out]
 }
 
-class NoKeyCoGroupBuilder(s: JCoGroupFunction) extends 
CoGroupOperator.Builder(new UserCodeObjectWrapper(s))
+class NoKeyCoGroupBuilder(s: JCoGroupFunction) extends 
CoGroupOperator.Builder(new UserCodeObjectWrapper(new 
CoGroupOperator.WrappingCoGroupFunction(s)))
 
 object CoGroupMacros {
   
@@ -106,7 +105,9 @@ object CoGroupMacros {
       implicit val leftInputUDT: UDT[LeftIn] = 
c.Expr[UDT[LeftIn]](createUdtLeftIn).splice
       implicit val rightInputUDT: UDT[RightIn] = 
c.Expr[UDT[RightIn]](createUdtRightIn).splice
       implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
+      
       new CoGroupFunctionBase[LeftIn, RightIn, Out] {
+        
         override def coGroup(leftRecords: JIterator[Record], rightRecords: 
JIterator[Record], out: Collector[Record]) = {
 
           val firstLeftRecord = leftIterator.initialize(leftRecords)
@@ -177,7 +178,9 @@ object CoGroupMacros {
       implicit val leftInputUDT: UDT[LeftIn] = 
c.Expr[UDT[LeftIn]](createUdtLeftIn).splice
       implicit val rightInputUDT: UDT[RightIn] = 
c.Expr[UDT[RightIn]](createUdtRightIn).splice
       implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
+      
       new CoGroupFunctionBase[LeftIn, RightIn, Out] {
+        
         override def coGroup(leftRecords: JIterator[Record], rightRecords: 
JIterator[Record], out: Collector[Record]) = {
           val firstLeftRecord = leftIterator.initialize(leftRecords)
           val firstRightRecord = rightIterator.initialize(rightRecords)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
index cf3a96c..f25b5a3 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.scala.operators
 
 import language.experimental.macros
@@ -191,7 +190,9 @@ object ReduceMacros {
       implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
 
       new ReduceFunctionBase[In, Out] {
-        override def reduce(records: JIterator[Record], out: 
Collector[Record]) = {
+        override def reduce(recordsIterable: JIterator[Record], out: 
Collector[Record]) = {
+          val records: JIterator[Record] = recordsIterable
+          
           if (records.hasNext) {
             val firstRecord = reduceIterator.initialize(records)
             reduceRecord.copyFrom(firstRecord, reduceForwardFrom, 
reduceForwardTo)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
index e76f86d..5497004 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
@@ -270,6 +270,7 @@ public class AccumulatorITCase extends RecordAPITestBase {
                private void reduceInternal(Iterator<Record> records, 
Collector<Record> out) {
                        Record element = null;
                        int sum = 0;
+                       
                        while (records.hasNext()) {
                                element = records.next();
                                IntValue i = element.getField(1, 
IntValue.class);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
index bf6c37d..197a745 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
@@ -124,11 +124,13 @@ public class AccumulatorIterativeITCase extends 
RecordAPITestBase {
                }
                
                @Override
-               public void reduce(Iterator<Record> it, Collector<Record> out) {
+               public void reduce(Iterator<Record> records, Collector<Record> 
out) {
                        // Compute the sum
                        int sum = 0;
-                       while (it.hasNext()) {
-                               Integer value = 
Integer.parseInt(it.next().getField(0, StringValue.class).getValue());
+                       
+                       while (records.hasNext()) {
+                               Record r = records.next();
+                               Integer value = Integer.parseInt(r.getField(0, 
StringValue.class).getValue());
                                sum += value;
                                testCounter.add(value);
                        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
index 4b3a4bf..2217679 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.record.io.CsvInputFormat;
+import 
org.apache.flink.api.java.record.operators.ReduceOperator.WrappingReduceFunction;
 import 
org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory;
 import 
org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
 import org.apache.flink.configuration.Configuration;
@@ -252,7 +253,7 @@ public class KMeansIterativeNepheleITCase extends 
RecordAPITestBase {
                tailConfig.setOutputSerializer(outputSerializer);
                
                // the udf
-               tailConfig.setStubWrapper(new 
UserCodeObjectWrapper<RecomputeClusterCenter>(new RecomputeClusterCenter()));
+               tailConfig.setStubWrapper(new 
UserCodeObjectWrapper<WrappingReduceFunction>(new WrappingReduceFunction(new 
RecomputeClusterCenter())));
                
                return tail;
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
index 35aea4b..b442b33 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
@@ -16,12 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
@@ -72,15 +70,13 @@ public class BulkIterationWithAllReducerITCase extends 
JavaProgramTestBase {
                }
 
                @Override
-               public void reduce(Iterator<Integer> records, 
Collector<Integer> out) {
+               public void reduce(Iterable<Integer> records, 
Collector<Integer> out) {
                        if (bcValue == null) {
                                return;
                        }
                        final int x = bcValue;
                        
-                       while (records.hasNext()) { 
-                               int y = records.next();
-
+                       for (Integer y : records) { 
                                if (y > x) {
                                        out.collect(y);
                                        return;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
index 37695b2..f2a43a8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.io.BufferedReader;
@@ -104,7 +103,8 @@ public class CoGroupConnectedComponentsITCase extends 
RecordAPITestBase {
                        long minimumComponentID = Long.MAX_VALUE;
 
                        while (candidates.hasNext()) {
-                               long candidateComponentID = 
candidates.next().getField(1, LongValue.class).getValue();
+                               Record candidate = candidates.next();
+                               long candidateComponentID = 
candidate.getField(1, LongValue.class).getValue();
                                if (candidateComponentID < minimumComponentID) {
                                        minimumComponentID = 
candidateComponentID;
                                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
index 49bdb26..c6175b2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.java.functions.RichCoGroupFunction;
@@ -116,17 +114,17 @@ public class CoGroupConnectedComponentsSecondITCase 
extends JavaProgramTestBase
        public static final class MinIdAndUpdate extends 
RichCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> 
{
                
                @Override
-               public void coGroup(Iterator<Tuple2<Long, Long>> candidates, 
Iterator<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) {
-                       if (!current.hasNext()) {
+               public void coGroup(Iterable<Tuple2<Long, Long>> candidates, 
Iterable<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) {
+                       if (!current.iterator().hasNext()) {
                                throw new RuntimeException("Error: Id not 
encountered before.");
                        }
                        
-                       Tuple2<Long, Long> old = current.next();
+                       Tuple2<Long, Long> old = current.iterator().next();
                        
                        long minimumComponentID = Long.MAX_VALUE;
 
-                       while (candidates.hasNext()) {
-                               long candidateComponentID = 
candidates.next().f1;
+                       for (Tuple2<Long, Long> candidate : candidates) {
+                               long candidateComponentID = candidate.f1;
                                if (candidateComponentID < minimumComponentID) {
                                        minimumComponentID = 
candidateComponentID;
                                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
index d2669aa..fdfb321 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.java.functions.RichFlatMapFunction;
@@ -170,8 +168,8 @@ public class DependencyConnectedComponentsITCase extends 
JavaProgramTestBase {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void reduce(Iterator<Long> values, Collector<Long> out) 
throws Exception {
-                               out.collect(values.next());
+               public void reduce(Iterable<Long> values, Collector<Long> out) {
+                               out.collect(values.iterator().next());
                }
        }
        
@@ -204,21 +202,19 @@ public class DependencyConnectedComponentsITCase extends 
JavaProgramTestBase {
                final Tuple2<Long, Long> resultVertex = new Tuple2<Long, 
Long>();
                
                @Override
-               public void reduce(Iterator<Tuple2<Long, Long>> values,
-                               Collector<Tuple2<Long, Long>> out) throws 
Exception {
+               public void reduce(Iterable<Tuple2<Long, Long>> values, 
Collector<Tuple2<Long, Long>> out) {
+                       Long vertexId = 0L;
+                       Long minimumCompId = Long.MAX_VALUE;
 
-                       final Tuple2<Long, Long> first = values.next();         
-                       final Long vertexId = first.f0;
-                       Long minimumCompId = first.f1;
-                       
-                       while ( values.hasNext() ) {
-                               Long candidateCompId = values.next().f1;
-                               if ( candidateCompId < minimumCompId ) {
+                       for (Tuple2<Long, Long> value: values) {
+                               vertexId = value.f0;
+                               Long candidateCompId = value.f1;
+                               if (candidateCompId < minimumCompId) {
                                        minimumCompId = candidateCompId;
                                }
                        }
-                       resultVertex.setField(vertexId, 0);
-                       resultVertex.setField(minimumCompId, 1);
+                       resultVertex.f0 = vertexId;
+                       resultVertex.f1 = minimumCompId;
 
                        out.collect(resultVertex);
                }
@@ -231,8 +227,8 @@ public class DependencyConnectedComponentsITCase extends 
JavaProgramTestBase {
                @Override
                public void flatMap(
                                Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> 
vertexWithNewAndOldId,
-                               Collector<Tuple2<Long, Long>> out) throws 
Exception {
-                                               
+                               Collector<Tuple2<Long, Long>> out)
+               {
                        if ( vertexWithNewAndOldId.f0.f1 < 
vertexWithNewAndOldId.f1.f1 ) {
                                out.collect(vertexWithNewAndOldId.f0);
                        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
index cb25019..3dc0bdf 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
index 6a487c4..2c777f5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.io.Serializable;
@@ -110,6 +109,7 @@ public class IterationTerminationWithTwoTails extends 
RecordAPITestBase {
                public void reduce(Iterator<Record> it, Collector<Record> out) {
                        // Compute the sum
                        int sum = 0;
+                       
                        while (it.hasNext()) {
                                sum += Integer.parseInt(it.next().getField(0, 
StringValue.class).getValue()) + 1;
                        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
index c81b32a..b3145dc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
index b62d85a..d00602f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
@@ -44,85 +44,81 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class IterationWithChainingITCase extends RecordAPITestBase {
 
-    private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n" + 
"1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
+       private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n" + 
"1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
 
-    private String dataPath;
-    private String resultPath;
+       private String dataPath;
+       private String resultPath;
 
-    public IterationWithChainingITCase(Configuration config) {
-        super(config);
+       public IterationWithChainingITCase(Configuration config) {
+               super(config);
                setTaskManagerNumSlots(DOP);
-    }
+       }
 
-    @Override
-    protected void preSubmit() throws Exception {
-        dataPath = createTempFile("data_points.txt", DATA_POINTS);
-        resultPath = getTempFilePath("result");
-    }
+       @Override
+       protected void preSubmit() throws Exception {
+               dataPath = createTempFile("data_points.txt", DATA_POINTS);
+               resultPath = getTempFilePath("result");
+       }
 
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(DATA_POINTS, resultPath);
-    }
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(DATA_POINTS, resultPath);
+       }
 
+       @Override
+       protected Plan getTestJob() {
+               Plan plan = 
getTestPlan(config.getInteger("ChainedMapperITCase#NoSubtasks", 1), dataPath, 
resultPath);
+               return plan;
+       }
 
-    @Override
-    protected Plan getTestJob() {
-        Plan plan = 
getTestPlan(config.getInteger("ChainedMapperITCase#NoSubtasks", 1), dataPath, 
resultPath);
-        return plan;
-    }
+       @Parameters
+       public static Collection<Object[]> getConfigurations() {
+               Configuration config1 = new Configuration();
+               config1.setInteger("ChainedMapperITCase#NoSubtasks", DOP);
+               return toParameterList(config1);
+       }
 
-    @Parameters
-    public static Collection<Object[]> getConfigurations() {
-        Configuration config1 = new Configuration();
-        config1.setInteger("ChainedMapperITCase#NoSubtasks", DOP);
-        return toParameterList(config1);
-    }
+       public static final class IdentityMapper extends MapFunction implements 
Serializable {
 
-    public static final class IdentityMapper extends MapFunction implements 
Serializable {
+               private static final long serialVersionUID = 1L;
 
-        private static final long serialVersionUID = 1L;
+               @Override
+               public void map(Record rec, Collector<Record> out) {
+                       out.collect(rec);
+               }
+       }
 
-        @Override
-        public void map(Record rec, Collector<Record> out) {
-            out.collect(rec);
-        }
-    }
+       public static final class DummyReducer extends ReduceFunction 
implements Serializable {
 
-    public static final class DummyReducer extends ReduceFunction implements 
Serializable {
+               private static final long serialVersionUID = 1L;
 
-        private static final long serialVersionUID = 1L;
+               @Override
+               public void reduce(Iterator<Record> it, Collector<Record> out) {
+                       while (it.hasNext()) {
+                               out.collect(it.next());
+                       }
+               }
+       }
 
-        @Override
-        public void reduce(Iterator<Record> it, Collector<Record> out) {
-            while (it.hasNext()) {
-                out.collect(it.next());
-            }
-        }
-    }
+       static Plan getTestPlan(int numSubTasks, String input, String output) {
 
-    static Plan getTestPlan(int numSubTasks, String input, String output) {
+               FileDataSource initialInput = new FileDataSource(new 
PointInFormat(), input, "Input");
+               initialInput.setDegreeOfParallelism(1);
 
-        FileDataSource initialInput = new FileDataSource(new PointInFormat(), 
input, "Input");
-        initialInput.setDegreeOfParallelism(1);
+               BulkIteration iteration = new BulkIteration("Loop");
+               iteration.setInput(initialInput);
+               iteration.setMaximumNumberOfIterations(2);
 
-        BulkIteration iteration = new BulkIteration("Loop");
-        iteration.setInput(initialInput);
-        iteration.setMaximumNumberOfIterations(2);
+               ReduceOperator dummyReduce = ReduceOperator.builder(new 
DummyReducer(), IntValue.class, 0).input(iteration.getPartialSolution())
+                               .name("Reduce something").build();
 
-        ReduceOperator dummyReduce = ReduceOperator.builder(new 
DummyReducer(), IntValue.class, 0)
-                .input(iteration.getPartialSolution())
-                .name("Reduce something")
-                .build();
+               MapOperator dummyMap = MapOperator.builder(new 
IdentityMapper()).input(dummyReduce).build();
+               iteration.setNextPartialSolution(dummyMap);
 
+               FileDataSink finalResult = new FileDataSink(new 
PointOutFormat(), output, iteration, "Output");
 
-        MapOperator dummyMap = MapOperator.builder(new 
IdentityMapper()).input(dummyReduce).build();
-        iteration.setNextPartialSolution(dummyMap);
-
-        FileDataSink finalResult = new FileDataSink(new PointOutFormat(), 
output, iteration, "Output");
-
-        Plan plan = new Plan(finalResult, "Iteration with chained map test");
-        plan.setDefaultParallelism(numSubTasks);
-        return plan;
-    }
+               Plan plan = new Plan(finalResult, "Iteration with chained map 
test");
+               plan.setDefaultParallelism(numSubTasks);
+               return plan;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
index aa83e9b..2a130ed 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
@@ -16,43 +16,28 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.IterativeDataSet;
 import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat;
 import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
+import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
-@RunWith(Parameterized.class)
-public class IterationWithUnionITCase extends RecordAPITestBase {
+public class IterationWithUnionITCase extends JavaProgramTestBase {
 
        private static final String DATAPOINTS = "0|50.90|16.20|72.08|\n" + 
"1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
 
        protected String dataPath;
        protected String resultPath;
 
-       
-       public IterationWithUnionITCase(Configuration config) {
-               super(config);
-               setTaskManagerNumSlots(DOP);
-       }
 
        @Override
        protected void preSubmit() throws Exception {
@@ -66,54 +51,36 @@ public class IterationWithUnionITCase extends 
RecordAPITestBase {
        }
 
        @Override
-       protected Plan getTestJob() {
-               return 
getPlan(config.getInteger("IterationWithUnionITCase#NumSubtasks", 1), dataPath, 
resultPath);
-       }
-
-       @Parameters
-       public static Collection<Object[]> getConfigurations() {
-               Configuration config1 = new Configuration();
-               config1.setInteger("IterationWithUnionITCase#NumSubtasks", DOP);
-
-               return toParameterList(config1);
-       }
-       
-       private static Plan getPlan(int numSubTasks, String input, String 
output) {
-               FileDataSource initialInput = new FileDataSource(new 
PointInFormat(), input, "Input");
-               initialInput.setDegreeOfParallelism(1);
+       protected void testProgram() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                
-               BulkIteration iteration = new BulkIteration("Loop");
-               iteration.setInput(initialInput);
-               iteration.setMaximumNumberOfIterations(2);
-
-               @SuppressWarnings("unchecked")
-               MapOperator map2 = MapOperator.builder(new 
IdentityMapper()).input(iteration.getPartialSolution(), 
iteration.getPartialSolution()).name("map").build();
+               DataSet<Record> initialInput = env.readFile(new 
PointInFormat(), this.dataPath).setParallelism(1);
                
-               iteration.setNextPartialSolution(map2);
-
-               FileDataSink finalResult = new FileDataSink(new 
PointOutFormat(), output, iteration, "Output");
-
-               Plan plan = new Plan(finalResult, "Iteration with union test");
-               plan.setDefaultParallelism(numSubTasks);
-               return plan;
+               IterativeDataSet<Record> iteration = initialInput.iterate(2);
+               
+               DataSet<Record> result = iteration.union(iteration).map(new 
IdentityMapper());
+               
+               iteration.closeWith(result).write(new PointOutFormat(), 
this.resultPath);
+               
+               env.execute();
        }
        
-       static final class IdentityMapper extends MapFunction implements 
Serializable {
+       static final class IdentityMapper implements MapFunction<Record, 
Record>, Serializable {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void map(Record rec, Collector<Record> out) {
-                       out.collect(rec);
+               public Record map(Record rec) {
+                       return rec;
                }
        }
 
-       static final class DummyReducer extends ReduceFunction implements 
Serializable {
+       static final class DummyReducer implements GroupReduceFunction<Record, 
Record>, Serializable {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void reduce(Iterator<Record> it, Collector<Record> out) {
-                       while (it.hasNext()) {
-                               out.collect(it.next());
+               public void reduce(Iterable<Record> it, Collector<Record> out) {
+                       for (Record r : it) {
+                               out.collect(r);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
index 4c8177a..9382708 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
@@ -16,34 +16,27 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative.aggregators;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.functions.RichJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
-
 import org.junit.Assert;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.IterativeDataSet;
 
-
 /**
- * 
- * Connected Components test case that uses a parametrizable aggregator
- *
+ * Connected Components test case that uses a parameterizable aggregator
  */
 public class ConnectedComponentsWithParametrizableAggregatorITCase extends 
JavaProgramTestBase {
 
@@ -147,7 +140,8 @@ public class 
ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
                }
        }
 
-       public static final class NeighborWithComponentIDJoin extends 
RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+       public static final class NeighborWithComponentIDJoin implements 
JoinFunction
+               <Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -163,24 +157,23 @@ public class 
ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
        public static final class MinimumReduce extends 
RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
                private static final long serialVersionUID = 1L;
-               final Tuple2<Long, Long> resultVertex = new Tuple2<Long, 
Long>();
+               
+               private final Tuple2<Long, Long> resultVertex = new 
Tuple2<Long, Long>();
 
                @Override
-               public void reduce(Iterator<Tuple2<Long, Long>> values,
-                               Collector<Tuple2<Long, Long>> out) throws 
Exception {
-
-                       final Tuple2<Long, Long> first = values.next();
-                       final Long vertexId = first.f0;
-                       Long minimumCompId = first.f1;
+               public void reduce(Iterable<Tuple2<Long, Long>> values, 
Collector<Tuple2<Long, Long>> out) {
+                       Long vertexId = 0L;
+                       Long minimumCompId = Long.MAX_VALUE;
 
-                       while (values.hasNext()) {
-                               Long candidateCompId = values.next().f1;
+                       for (Tuple2<Long, Long> value: values) {
+                               vertexId = value.f0;
+                               Long candidateCompId = value.f1;
                                if (candidateCompId < minimumCompId) {
                                        minimumCompId = candidateCompId;
                                }
                        }
-                       resultVertex.setField(vertexId, 0);
-                       resultVertex.setField(minimumCompId, 1);
+                       resultVertex.f0 = vertexId;
+                       resultVertex.f1 = minimumCompId;
 
                        out.collect(resultVertex);
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
index 104c3df..039d64e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative.aggregators;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
@@ -158,21 +156,19 @@ public class 
ConnectedComponentsWithParametrizableConvergenceITCase extends Java
                final Tuple2<Long, Long> resultVertex = new Tuple2<Long, 
Long>();
 
                @Override
-               public void reduce(Iterator<Tuple2<Long, Long>> values,
-                               Collector<Tuple2<Long, Long>> out) throws 
Exception {
-
-                       final Tuple2<Long, Long> first = values.next();         
-                       final Long vertexId = first.f0;
-                       Long minimumCompId = first.f1;
+               public void reduce(Iterable<Tuple2<Long, Long>> values, 
Collector<Tuple2<Long, Long>> out) {
+                       Long vertexId = 0L;
+                       Long minimumCompId = Long.MAX_VALUE;
 
-                       while (values.hasNext()) {
-                               Long candidateCompId = values.next().f1;
+                       for (Tuple2<Long, Long> value: values) {
+                               vertexId = value.f0;
+                               Long candidateCompId = value.f1;
                                if (candidateCompId < minimumCompId) {
                                        minimumCompId = candidateCompId;
                                }
                        }
-                       resultVertex.setField(vertexId, 0);
-                       resultVertex.setField(minimumCompId, 1);
+                       resultVertex.f0 = vertexId;
+                       resultVertex.f1 = minimumCompId;
 
                        out.collect(resultVertex);
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 3060b68..4fd22a3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -31,6 +32,8 @@ import org.apache.flink.api.java.record.functions.MapFunction;
 import org.apache.flink.api.java.record.io.CsvInputFormat;
 import org.apache.flink.api.java.record.io.CsvOutputFormat;
 import org.apache.flink.api.java.record.io.FileOutputFormat;
+import 
org.apache.flink.api.java.record.operators.ReduceOperator.WrappingReduceFunction;
+import 
org.apache.flink.api.java.record.operators.ReduceOperator.WrappingClassReduceFunction;
 import 
org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory;
 import 
org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
 import 
org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
@@ -317,7 +320,7 @@ public class ConnectedComponentsNepheleITCase extends 
RecordAPITestBase {
                        
intermediateConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
                        intermediateConfig.setDriverComparator(comparator, 0);
                        intermediateConfig.setStubWrapper(
-                               new 
UserCodeClassWrapper<MinimumComponentIDReduce>(MinimumComponentIDReduce.class));
+                               new 
UserCodeObjectWrapper<WrappingReduceFunction>(new 
WrappingClassReduceFunction(MinimumComponentIDReduce.class)));
                }
 
                return intermediate;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
index 448b7bd..5a6e4f5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
@@ -19,13 +19,12 @@
 package org.apache.flink.test.iterative.nephele;
 
 import java.util.Collection;
-import java.util.Iterator;
 
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
 import org.apache.flink.api.java.record.io.FileOutputFormat;
 import 
org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory;
 import 
org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
@@ -273,14 +272,14 @@ public class IterationWithChainingNepheleITCase extends 
RecordAPITestBase {
                }
        }
 
-       public static final class DummyReducer extends ReduceFunction {
+       public static final class DummyReducer implements 
GroupReduceFunction<Record, Record> {
 
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void reduce(Iterator<Record> it, Collector<Record> out) {
-                       while (it.hasNext()) {
-                               out.collect(it.next());
+               public void reduce(Iterable<Record> it, Collector<Record> out) {
+                       for (Record r :it) {
+                               out.collect(r);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
index 1ec0eb4..cd6a89d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
 
 import java.util.Iterator;
@@ -83,11 +82,13 @@ public class CustomCompensatableDotProductCoGroup extends 
AbstractRichFunction i
        }
 
        @Override
-       public void coGroup(Iterator<VertexWithRankAndDangling> 
currentPageRankIterator, Iterator<VertexWithRank> partialRanks,
+       public void coGroup(Iterable<VertexWithRankAndDangling> 
currentPageRankIterable, Iterable<VertexWithRank> partialRanks,
                        Collector<VertexWithRankAndDangling> collector)
        {
+               final Iterator<VertexWithRankAndDangling> 
currentPageRankIterator = currentPageRankIterable.iterator();
+               
                if (!currentPageRankIterator.hasNext()) {
-                       long missingVertex = partialRanks.next().getVertexID();
+                       long missingVertex = 
partialRanks.iterator().next().getVertexID();
                        throw new IllegalStateException("No current page rank 
for vertex [" + missingVertex + "]!");
                }
 
@@ -95,8 +96,8 @@ public class CustomCompensatableDotProductCoGroup extends 
AbstractRichFunction i
 
                long edges = 0;
                double summedRank = 0;
-               while (partialRanks.hasNext()) {
-                       summedRank += partialRanks.next().getRank();
+               for (VertexWithRank pr :partialRanks) {
+                       summedRank += pr.getRank();
                        edges++;
                }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
index d83b33b..8d92c59 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.Vert
 import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
 import org.apache.flink.util.Collector;
 
+@SuppressWarnings("deprecation")
 public class CustomCompensatingMap extends AbstractRichFunction implements 
GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> {
        
        private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
index 1e08a9f..8ec8403 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
@@ -35,12 +35,14 @@ public class CustomRankCombiner extends 
AbstractRichFunction implements GroupRed
        private final VertexWithRank accumulator = new VertexWithRank();
        
        @Override
-       public void reduce(Iterator<VertexWithRank> records, 
Collector<VertexWithRank> out) throws Exception {
+       public void reduce(Iterable<VertexWithRank> records, 
Collector<VertexWithRank> out) throws Exception {
                throw new UnsupportedOperationException();
        }
 
        @Override
-       public void combine(Iterator<VertexWithRank> records, 
Collector<VertexWithRank> out) throws Exception {
+       public void combine(Iterable<VertexWithRank> recordsIterable, 
Collector<VertexWithRank> out) throws Exception {
+               final Iterator<VertexWithRank> records = 
recordsIterable.iterator();
+               
                VertexWithRank next = records.next();
                this.accumulator.setVertexID(next.getVertexID());
                double rank = next.getRank();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
index ab3dea9..c51cf96 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative.nephele.danglingpagerank;
 
 import java.util.Iterator;
@@ -102,7 +101,8 @@ public class CompensatableDotProductCoGroup extends 
CoGroupFunction {
                long edges = 0;
                double summedRank = 0;
                while (partialRanks.hasNext()) {
-                       summedRank += partialRanks.next().getField(1, 
doubleInstance).getValue();
+                       Record pr = partialRanks.next();
+                       summedRank += pr.getField(1, doubleInstance).getValue();
                        edges++;
                }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index 3749c1d..d59d721 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.javaApiOperators;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.LinkedList;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -308,21 +307,19 @@ public class CoGroupITCase extends JavaProgramTestBase {
 
                @Override
                public void coGroup(
-                               Iterator<Tuple5<Integer, Long, Integer, String, 
Long>> first,
-                               Iterator<Tuple5<Integer, Long, Integer, String, 
Long>> second,
-                               Collector<Tuple2<Integer, Integer>> out) throws 
Exception {
-                       
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> first,
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> second,
+                               Collector<Tuple2<Integer, Integer>> out)
+               {
                        int sum = 0;
                        int id = 0;
                        
-                       while(first.hasNext()) {
-                               Tuple5<Integer, Long, Integer, String, Long> 
element = first.next();
+                       for ( Tuple5<Integer, Long, Integer, String, Long> 
element : first ) {
                                sum += element.f2;
                                id = element.f0;
                        }
                        
-                       while(second.hasNext()) {
-                               Tuple5<Integer, Long, Integer, String, Long> 
element = second.next();
+                       for ( Tuple5<Integer, Long, Integer, String, Long> 
element : second ) {
                                sum += element.f2;
                                id = element.f0;
                        }
@@ -336,27 +333,22 @@ public class CoGroupITCase extends JavaProgramTestBase {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void coGroup(Iterator<CustomType> first,
-                               Iterator<CustomType> second, 
Collector<CustomType> out)
-                               throws Exception {
+               public void coGroup(Iterable<CustomType> first, 
Iterable<CustomType> second, Collector<CustomType> out) {
                        
                        CustomType o = new CustomType(0,0,"test");
                        
-                       while(first.hasNext()) {
-                               CustomType element = first.next();
+                       for ( CustomType element : first ) {
                                o.myInt = element.myInt;
                                o.myLong += element.myLong;
                        }
                        
-                       while(second.hasNext()) {
-                               CustomType element = second.next();
+                       for ( CustomType element : second ) {
                                o.myInt = element.myInt;
                                o.myLong += element.myLong;
                        }
                        
                        out.collect(o);
                }
-               
        }
        
        public static class MixedCoGroup implements 
CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, 
Tuple3<Integer, Long, String>> {
@@ -365,21 +357,19 @@ public class CoGroupITCase extends JavaProgramTestBase {
 
                @Override
                public void coGroup(
-                               Iterator<Tuple5<Integer, Long, Integer, String, 
Long>> first,
-                               Iterator<CustomType> second,
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> first,
+                               Iterable<CustomType> second,
                                Collector<Tuple3<Integer, Long, String>> out) 
throws Exception {
                        
                        long sum = 0;
                        int id = 0;
                        
-                       while(first.hasNext()) {
-                               Tuple5<Integer, Long, Integer, String, Long> 
element = first.next();
+                       for ( Tuple5<Integer, Long, Integer, String, Long> 
element : first ) {
                                sum += element.f0;
                                id = element.f2;
                        }
                        
-                       while(second.hasNext()) {
-                               CustomType element = second.next();
+                       for (CustomType element : second) {
                                id = element.myInt;
                                sum += element.myLong;
                        }
@@ -394,20 +384,18 @@ public class CoGroupITCase extends JavaProgramTestBase {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void coGroup(Iterator<CustomType> first,
-                               Iterator<Tuple5<Integer, Long, Integer, String, 
Long>> second,
-                               Collector<CustomType> out) throws Exception {
-                       
+               public void coGroup(Iterable<CustomType> first,
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> second,
+                               Collector<CustomType> out)
+               {
                        CustomType o = new CustomType(0,0,"test");
                        
-                       while(first.hasNext()) {
-                               CustomType element = first.next();
+                       for (CustomType element : first) {
                                o.myInt = element.myInt;
                                o.myLong += element.myLong;
                        }
                        
-                       while(second.hasNext()) {
-                               Tuple5<Integer, Long, Integer, String, Long> 
element = second.next();
+                       for (Tuple5<Integer, Long, Integer, String, Long> 
element : second) {
                                o.myInt = element.f2;
                                o.myLong += element.f0;
                        }
@@ -423,14 +411,14 @@ public class CoGroupITCase extends JavaProgramTestBase {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void coGroup(Iterator<Tuple3<Integer, Long, String>> 
first,
-                               Iterator<Tuple3<Integer, Long, String>> second,
-                               Collector<Tuple3<Integer, Long, String>> out) 
throws Exception {
-
-                       while(first.hasNext()) {
-                               Tuple3<Integer, Long, String> element = 
first.next();
-                               if(element.f0 < 6)
+               public void coGroup(Iterable<Tuple3<Integer, Long, String>> 
first,
+                               Iterable<Tuple3<Integer, Long, String>> second,
+                               Collector<Tuple3<Integer, Long, String>> out)
+               {
+                       for (Tuple3<Integer, Long, String> element : first) {
+                               if(element.f0 < 6) {
                                        out.collect(element);
+                               }
                        }
                }
        }
@@ -441,20 +429,16 @@ public class CoGroupITCase extends JavaProgramTestBase {
 
                @Override
                public void coGroup(
-                               Iterator<Tuple5<Integer, Long, Integer, String, 
Long>> first,
-                               Iterator<Tuple5<Integer, Long, Integer, String, 
Long>> second,
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> first,
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> second,
                                Collector<Tuple5<Integer, Long, Integer, 
String, Long>> out)
-                               throws Exception {
-
-                       while(second.hasNext()) {
-                               Tuple5<Integer, Long, Integer, String, Long> 
element = second.next();
-                               if(element.f0 < 4)
+               {
+                       for (Tuple5<Integer, Long, Integer, String, Long> 
element : second) {
+                               if(element.f0 < 4) {
                                        out.collect(element);
+                               }
                        }
-                       
                }
-
-
        }
        
        public static class Tuple5CoGroupBC extends 
RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, 
Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, 
Integer>> {
@@ -477,21 +461,19 @@ public class CoGroupITCase extends JavaProgramTestBase {
 
                @Override
                public void coGroup(
-                               Iterator<Tuple5<Integer, Long, Integer, String, 
Long>> first,
-                               Iterator<Tuple5<Integer, Long, Integer, String, 
Long>> second,
-                               Collector<Tuple3<Integer, Integer, Integer>> 
out) throws Exception {
-                       
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> first,
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> second,
+                               Collector<Tuple3<Integer, Integer, Integer>> 
out)
+               {
                        int sum = 0;
                        int id = 0;
                        
-                       while(first.hasNext()) {
-                               Tuple5<Integer, Long, Integer, String, Long> 
element = first.next();
+                       for (Tuple5<Integer, Long, Integer, String, Long> 
element : first) {
                                sum += element.f2;
                                id = element.f0;
                        }
                        
-                       while(second.hasNext()) {
-                               Tuple5<Integer, Long, Integer, String, Long> 
element = second.next();
+                       for (Tuple5<Integer, Long, Integer, String, Long> 
element : second) {
                                sum += element.f2;
                                id = element.f0;
                        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 7376e86..bd10c5e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -422,16 +422,13 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
        public static class Tuple3GroupReduce implements 
GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
                private static final long serialVersionUID = 1L;
 
-
                @Override
-               public void reduce(Iterator<Tuple3<Integer, Long, String>> 
values,
-                               Collector<Tuple2<Integer, Long>> out) throws 
Exception {
+               public void reduce(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple2<Integer, Long>> out) {
                        
                        int i = 0;
                        long l = 0l;
                        
-                       while(values.hasNext()) {
-                               Tuple3<Integer, Long, String> t = values.next();
+                       for (Tuple3<Integer, Long, String> t : values) {
                                i += t.f0;
                                l = t.f1;
                        }
@@ -446,24 +443,22 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
 
 
                @Override
-               public void reduce(Iterator<Tuple3<Integer, Long, String>> 
values,
-                               Collector<Tuple3<Integer, Long, String>> out) 
throws Exception {
-                       
-                       Tuple3<Integer, Long, String> t = values.next();
-                       
-                       int sum = t.f0;
-                       long key = t.f1;
-                       String concat = t.f2;
+               public void reduce(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) {
+                       int sum = 0;
+                       long key = 0;
+                       StringBuilder concat = new StringBuilder();
                        
-                       while(values.hasNext()) {
-                               t = values.next();
-                               
-                               sum += t.f0;
-                               concat += "-"+t.f2;
+                       for (Tuple3<Integer, Long, String> next : values) {
+                               sum += next.f0;
+                               key = next.f1;
+                               concat.append(next.f2).append("-");
                        }
                        
-                       out.collect(new Tuple3<Integer, Long, String>(sum, key, 
concat));
+                       if (concat.length() > 0) {
+                               concat.setLength(concat.length() - 1);
+                       }
                        
+                       out.collect(new Tuple3<Integer, Long, String>(sum, key, 
concat.toString()));
                }
        }
        
@@ -472,16 +467,14 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
 
                @Override
                public void reduce(
-                               Iterator<Tuple5<Integer, Long, Integer, String, 
Long>> values,
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> values,
                                Collector<Tuple5<Integer, Long, Integer, 
String, Long>> out)
-                               throws Exception {
-                       
+               {
                        int i = 0;
                        long l = 0l;
                        long l2 = 0l;
                        
-                       while(values.hasNext()) {
-                               Tuple5<Integer, Long, Integer, String, Long> t 
= values.next();
+                       for ( Tuple5<Integer, Long, Integer, String, Long> t : 
values ) {
                                i = t.f0;
                                l += t.f1;
                                l2 = t.f4;
@@ -496,20 +489,19 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
                
 
                @Override
-               public void reduce(Iterator<CustomType> values,
-                               Collector<CustomType> out) throws Exception {
+               public void reduce(Iterable<CustomType> values, 
Collector<CustomType> out) {
+                       final Iterator<CustomType> iter = values.iterator();
                        
                        CustomType o = new CustomType();
-                       CustomType c = values.next();
+                       CustomType c = iter.next();
                        
                        o.myString = "Hello!";
                        o.myInt = c.myInt;
                        o.myLong = c.myLong;
                        
-                       while(values.hasNext()) {
-                               c = values.next();
-                               o.myLong += c.myLong;
-
+                       while (iter.hasNext()) {
+                               CustomType next = iter.next();
+                               o.myLong += next.myLong;
                        }
                        
                        out.collect(o);
@@ -522,11 +514,9 @@ public class GroupReduceITCase extends JavaProgramTestBase 
{
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void reduce(Iterator<Tuple3<Integer, Long, String>> 
values,
-                               Collector<Tuple3<Integer, Long, String>> out) 
throws Exception {
+               public void reduce(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) {
 
-                       while(values.hasNext()) {
-                               Tuple3<Integer, Long, String> t = values.next();
+                       for ( Tuple3<Integer, Long, String> t : values ) {
                                
                                if(t.f0 < 4) {
                                        t.f2 = "Hi!";
@@ -544,14 +534,12 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
                private static final long serialVersionUID = 1L;
                
                @Override
-               public void reduce(Iterator<Tuple3<Integer, Long, String>> 
values,
-                               Collector<Tuple3<Integer, Long, String>> out) 
throws Exception {
+               public void reduce(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) {
 
                        int i = 0;
                        long l = 0l;
                        
-                       while(values.hasNext()) {
-                               Tuple3<Integer, Long, String> t = values.next();
+                       for ( Tuple3<Integer, Long, String> t : values ) {
                                i += t.f0;
                                l += t.f1;
                        }
@@ -564,21 +552,13 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
                private static final long serialVersionUID = 1L;
                
                @Override
-               public void reduce(Iterator<CustomType> values,
-                               Collector<CustomType> out) throws Exception {
+               public void reduce(Iterable<CustomType> values, 
Collector<CustomType> out) {
 
-                       CustomType o = new CustomType();
-                       CustomType c = values.next();
-                       
-                       o.myString = "Hello!";
-                       o.myInt = c.myInt;
-                       o.myLong = c.myLong;
-                       
+                       CustomType o = new CustomType(0, 0, "Hello!");
                        
-                       while(values.hasNext()) {
-                               c = values.next();
-                               o.myInt += c.myInt;
-                               o.myLong += c.myLong;
+                       for (CustomType next : values) {
+                               o.myInt += next.myInt;
+                               o.myLong += next.myLong;
                        }
                        
                        out.collect(o);
@@ -602,14 +582,12 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
                }
 
                @Override
-               public void reduce(Iterator<Tuple3<Integer, Long, String>> 
values,
-                               Collector<Tuple3<Integer, Long, String>> out) 
throws Exception {
+               public void reduce(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) {
                                
                        int i = 0;
                        long l = 0l;
                        
-                       while(values.hasNext()) {
-                               Tuple3<Integer, Long, String> t = values.next();
+                       for ( Tuple3<Integer, Long, String> t : values ) {
                                i += t.f0;
                                l = t.f1;
                        }
@@ -624,12 +602,11 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void combine(Iterator<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+               public void combine(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) {
 
                        Tuple3<Integer, Long, String> o = new Tuple3<Integer, 
Long, String>(0, 0l, "");
 
-                       while(values.hasNext()) {
-                               Tuple3<Integer, Long, String> t = values.next();
+                       for ( Tuple3<Integer, Long, String> t : values ) {
                                o.f0 += t.f0;
                                o.f1 = t.f1;
                                o.f2 = "test"+o.f1;
@@ -639,14 +616,12 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
                }
 
                @Override
-               public void reduce(Iterator<Tuple3<Integer, Long, String>> 
values,
-                               Collector<Tuple2<Integer, String>> out) throws 
Exception {
+               public void reduce(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple2<Integer, String>> out) {
 
                        int i = 0;
                        String s = "";
 
-                       while(values.hasNext()) {
-                               Tuple3<Integer, Long, String> t = values.next();
+                       for ( Tuple3<Integer, Long, String> t : values ) {
                                i += t.f0;
                                s = t.f2;
                        }
@@ -661,12 +636,11 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
                private static final long serialVersionUID = 1L;
                
                @Override
-               public void combine(Iterator<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) {
+               public void combine(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) {
                        
                        Tuple3<Integer, Long, String> o = new Tuple3<Integer, 
Long, String>(0, 0l, "");
                        
-                       while(values.hasNext()) {
-                               Tuple3<Integer, Long, String> t = values.next();
+                       for ( Tuple3<Integer, Long, String> t : values ) {
                                o.f0 += t.f0;
                                o.f1 += t.f1;
                                o.f2 += "test";
@@ -676,13 +650,12 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
                }
 
                @Override
-               public void reduce(Iterator<Tuple3<Integer, Long, String>> 
values, Collector<Tuple2<Integer, String>> out) {
+               public void reduce(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple2<Integer, String>> out) {
                        
                        int i = 0;
                        String s = "";
                        
-                       while(values.hasNext()) {
-                               Tuple3<Integer, Long, String> t = values.next();
+                       for ( Tuple3<Integer, Long, String> t : values ) {
                                i += t.f0 + t.f1;
                                s += t.f2;
                        }
@@ -697,12 +670,11 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
                private static final long serialVersionUID = 1L;
                
                @Override
-               public void combine(Iterator<CustomType> values, 
Collector<CustomType> out) throws Exception {
+               public void combine(Iterable<CustomType> values, 
Collector<CustomType> out) throws Exception {
                        
                        CustomType o = new CustomType();
                        
-                       while(values.hasNext()) {
-                               CustomType c = values.next();
+                       for ( CustomType c : values ) {
                                o.myInt = c.myInt;
                                o.myLong += c.myLong;
                                o.myString = "test"+c.myInt;
@@ -712,13 +684,11 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
                }
 
                @Override
-               public void reduce(Iterator<CustomType> values,
-                               Collector<CustomType> out) throws Exception {
+               public void reduce(Iterable<CustomType> values, 
Collector<CustomType> out)  {
                        
                        CustomType o = new CustomType(0, 0, "");
                        
-                       while(values.hasNext()) {
-                               CustomType c = values.next();
+                       for ( CustomType c : values) {
                                o.myInt = c.myInt;
                                o.myLong += c.myLong;
                                o.myString = c.myString;
@@ -733,6 +703,5 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 
                @Override
                public T map(T value) { return value; }
-               
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
index 5a895d3..865c550 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.operators;
 
 import java.io.FileNotFoundException;
@@ -26,8 +25,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.record.functions.CoGroupFunction;
 import org.apache.flink.api.java.record.io.DelimitedInputFormat;
@@ -46,13 +43,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-/**
- */
 @RunWith(Parameterized.class)
 public class CoGroupITCase extends RecordAPITestBase {
 
-       private static final Log LOG = LogFactory.getLog(CoGroupITCase.class);
-
        String leftInPath = null;
        String rightInPath = null;
        String resultPath = null;
@@ -89,8 +82,6 @@ public class CoGroupITCase extends RecordAPITestBase {
                        target.setField(0, keyString);
                        target.setField(1, valueString);
 
-                       LOG.debug("Read in: [" + keyString.getValue() + "," + 
valueString.getValue() + "]");
-
                        return target;
                }
 
@@ -112,9 +103,6 @@ public class CoGroupITCase extends RecordAPITestBase {
                        this.buffer.append('\n');
                        
                        byte[] bytes = this.buffer.toString().getBytes();
-
-                       LOG.debug("Writing out: [" + keyString.toString() + "," 
+ valueInteger.getValue() + "]");
-
                        this.stream.write(bytes);
                }
        }
@@ -124,33 +112,28 @@ public class CoGroupITCase extends RecordAPITestBase {
 
                private StringValue keyString = new StringValue();
                private StringValue valueString = new StringValue();
-               private Record record = new Record();
                
                @Override
                public void coGroup(Iterator<Record> records1, Iterator<Record> 
records2, Collector<Record> out) {
-                       // TODO Auto-generated method stub
                        
+                       Record record = null;
                        int sum = 0;
-                       LOG.debug("Start iterating over input1");
+                       
                        while (records1.hasNext()) {
                                record = records1.next();
                                keyString = record.getField(0, keyString);
                                valueString = record.getField(1, valueString);
                                sum += Integer.parseInt(valueString.getValue());
-
-                               LOG.debug("Processed: [" + keyString.getValue() 
+ "," + valueString.getValue() + "]");
                        }
-                       LOG.debug("Start iterating over input2");
+                       
+                       
                        while (records2.hasNext()) {
                                record = records2.next();
                                keyString = record.getField(0, keyString);
                                valueString = record.getField(1, valueString);
                                sum -= Integer.parseInt(valueString.getValue());
-
-                               LOG.debug("Processed: [" + keyString.getValue() 
+ "," + valueString.getValue() + "]");
                        }
                        record.setField(1, new IntValue(sum));
-                       LOG.debug("Finished");
                        
                        out.collect(record);
                }
@@ -197,9 +180,9 @@ public class CoGroupITCase extends RecordAPITestBase {
 
                LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
 
-               String[] localStrategies = { 
PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE};
+               String[] localStrategies = { 
PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE };
 
-               String[] shipStrategies = { 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH, };
+               String[] shipStrategies = { 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH };
 
                for (String localStrategy : localStrategies) {
                        for (String shipStrategy : shipStrategies) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
index b06cf47..2711417 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.operators;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.record.functions.ReduceFunction;
 import org.apache.flink.api.java.record.io.DelimitedInputFormat;
@@ -53,8 +50,6 @@ import java.util.LinkedList;
 
 @RunWith(Parameterized.class)
 public class ReduceITCase extends RecordAPITestBase {
-       
-       private static final Log LOG = LogFactory.getLog(ReduceITCase.class);
 
        String inPath = null;
        String resultPath = null;
@@ -83,17 +78,14 @@ public class ReduceITCase extends RecordAPITestBase {
                private StringValue combineValue = new StringValue();
 
                @Override
-               public void combine(Iterator<Record> records, Collector<Record> 
out) throws Exception {
-               
+               public void combine(Iterator<Record> records, Collector<Record> 
out) {
+                       Record record = null;
                        int sum = 0;
-                       Record record = new Record();
+                       
                        while (records.hasNext()) {
                                record = records.next();
                                combineValue = record.getField(1, combineValue);
                                sum += 
Integer.parseInt(combineValue.toString());
-
-                               LOG.debug("Processed: [" + record.getField(0, 
StringValue.class).toString() +
-                                               "," + combineValue.toString() + 
"]");
                        }
                        combineValue.setValue(sum + "");
                        record.setField(1, combineValue);
@@ -101,17 +93,14 @@ public class ReduceITCase extends RecordAPITestBase {
                }
 
                @Override
-               public void reduce(Iterator<Record> records, Collector<Record> 
out) throws Exception {
-               
+               public void reduce(Iterator<Record> records, Collector<Record> 
out) {
+                       Record record = null;
                        int sum = 0;
-                       Record record = new Record();
+                       
                        while (records.hasNext()) {
                                record = records.next();
                                reduceValue = record.getField(1, reduceValue);
                                sum += Integer.parseInt(reduceValue.toString());
-
-                               LOG.debug("Processed: [" + record.getField(0, 
StringValue.class).toString() +
-                                               "," + reduceValue.toString() + 
"]");
                        }
                        record.setField(1, new IntValue(sum));
                        out.collect(record);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
index 60b7512..0b84309 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobTests;
 
 import java.io.Serializable;

Reply via email to