[FLINK-1085] [tests] Make the combiner tests generic. Add more coverage for 
oversized records.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01c74338
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01c74338
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01c74338

Branch: refs/heads/master
Commit: 01c74338ff44ea7f3735a7eb94b2ce01ababc505
Parents: 7271881
Author: Stephan Ewen <[email protected]>
Authored: Mon Jul 13 15:12:04 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Jul 13 16:29:38 2015 +0200

----------------------------------------------------------------------
 .../operators/GroupReduceCombineDriver.java     |  60 +--
 .../operators/CombineTaskExternalITCase.java    |  87 +++-
 .../runtime/operators/CombineTaskTest.java      | 354 ++++++++--------
 .../operators/CombinerOversizedRecordsTest.java | 236 +++++++++++
 .../operators/testutils/DelayingIterator.java   |  59 +++
 .../testutils/InfiniteIntTupleIterator.java     |  38 ++
 .../runtime/operators/testutils/TestData.java   |   2 +-
 .../testutils/UnaryOperatorTestBase.java        | 410 +++++++++++++++++++
 .../UniformIntStringTupleGenerator.java         |  77 ++++
 .../testutils/UniformIntTupleGenerator.java     |  75 ++++
 .../operators/testutils/UnionIterator.java      |  16 +-
 11 files changed, 1199 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index c426295..2bf778e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -16,13 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -33,10 +29,15 @@ import 
org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
 import org.apache.flink.runtime.operators.sort.InMemorySorter;
 import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
 import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
 import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -44,11 +45,12 @@ import java.util.List;
  * the user supplied a RichGroupReduceFunction with a combine method. The 
combining is performed in memory with a
  * lazy approach which only combines elements which currently fit in the 
sorter. This may lead to a partial solution.
  * In the case of the RichGroupReduceFunction this partial result is 
transformed into a proper deterministic result.
- * The CombineGroup uses the GroupCombineFunction interface which allows to 
combine values of type <IN> to any type
- * of type <OUT>. In contrast, the RichGroupReduceFunction requires the 
combine method to have the same input and
- * output type to be able to reduce the elements after the combine from <IN> 
to <OUT>.
+ * The CombineGroup uses the GroupCombineFunction interface which allows to 
combine values of type {@code IN} 
+ * to any type of type {@code OUT}. In contrast, the RichGroupReduceFunction 
requires the combine method
+ * to have the same input and output type to be able to reduce the elements 
after the combine from 
+ * {@code IN} to {@code OUT}.
  *
- * The CombineTask uses a combining iterator over its input. The output of the 
iterator is emitted.
+ * <p>The CombineTask uses a combining iterator over its input. The output of 
the iterator is emitted.</p>
  * 
  * @param <IN> The data type consumed by the combiner.
  * @param <OUT> The data type produced by the combiner.
@@ -67,8 +69,6 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombin
        private GroupCombineFunction<IN, OUT> combiner;
 
        private TypeSerializer<IN> serializer;
-
-       private TypeComparator<IN> sortingComparator;
        
        private TypeComparator<IN> groupingComparator;
 
@@ -78,7 +78,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombin
 
        private Collector<OUT> output;
 
-       private long oversizedRecordCount = 0L;
+       private long oversizedRecordCount;
 
        private volatile boolean running = true;
 
@@ -112,9 +112,8 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombin
        @Override
        public void prepare() throws Exception {
                final DriverStrategy driverStrategy = 
this.taskContext.getTaskConfig().getDriverStrategy();
-               if(driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE){
-                       throw new Exception("Invalid strategy " + 
driverStrategy + " for " +
-                                       "group reduce combinder.");
+               if (driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE){
+                       throw new Exception("Invalid strategy " + 
driverStrategy + " for group reduce combiner.");
                }
 
                this.memManager = this.taskContext.getMemoryManager();
@@ -122,7 +121,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombin
 
                final TypeSerializerFactory<IN> serializerFactory = 
this.taskContext.getInputSerializer(0);
                this.serializer = serializerFactory.getSerializer();
-               this.sortingComparator = 
this.taskContext.getDriverComparator(0);
+
+               final TypeComparator<IN> sortingComparator = 
this.taskContext.getDriverComparator(0);
+               
                this.groupingComparator = 
this.taskContext.getDriverComparator(1);
                this.combiner = this.taskContext.getStub();
                this.output = this.taskContext.getOutputCollector();
@@ -131,12 +132,12 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombin
                                numMemoryPages);
 
                // instantiate a fix-length in-place sorter, if possible, 
otherwise the out-of-place sorter
-               if 
(this.sortingComparator.supportsSerializationWithKeyNormalization() &&
+               if 
(sortingComparator.supportsSerializationWithKeyNormalization() &&
                                this.serializer.getLength() > 0 && 
this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
                {
-                       this.sorter = new 
FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory);
+                       this.sorter = new 
FixedLengthRecordSorter<IN>(this.serializer, sortingComparator, memory);
                } else {
-                       this.sorter = new 
NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), 
memory);
+                       this.sorter = new 
NormalizedKeySorter<IN>(this.serializer, sortingComparator.duplicate(), memory);
                }
 
                ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
@@ -171,10 +172,14 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombin
 
                        // write the value again
                        if (!this.sorter.write(value)) {
+                               
                                ++oversizedRecordCount;
-                               LOG.debug("Cannot write record to fresh sort 
buffer. Record too large. Oversized record count: {}", oversizedRecordCount);
-                               // simply forward the record
-                               this.output.collect((OUT)value);
+                               LOG.debug("Cannot write record to fresh sort 
buffer, record is too large. " +
+                                                               "Oversized 
record count: {}", oversizedRecordCount);
+                               
+                               // simply forward the record. We need to pass 
it through the combine function to convert it
+                               Iterable<IN> input = 
Collections.singleton(value);
+                               this.combiner.combine(input, this.output);
                        }
                }
 
@@ -210,7 +215,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombin
 
        @Override
        public void cleanup() throws Exception {
-               if(this.sorter != null) {
+               if (this.sorter != null) {
                        this.memManager.release(this.sorter.dispose());
                }
        }
@@ -218,8 +223,17 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombin
        @Override
        public void cancel() {
                this.running = false;
-               if(this.sorter != null) {
+               if (this.sorter != null) {
                        this.memManager.release(this.sorter.dispose());
                }
        }
+
+       /**
+        * Gets the number of oversized records handled by this combiner.
+        * 
+        * @return The number of oversized records handled by this combiner.
+        */
+       public long getOversizedRecordCount() {
+               return oversizedRecordCount;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index d957fa1..4905e57 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -16,17 +16,18 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import 
org.apache.flink.runtime.operators.CombineTaskTest.MockCombiningReduceStub;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
@@ -45,7 +46,7 @@ public class CombineTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFun
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ 
IntValue.class });
 
        public CombineTaskExternalITCase(ExecutionConfig config) {
                super(config, COMBINE_MEM, 0);
@@ -161,4 +162,84 @@ public class CombineTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFun
                
                this.outList.clear();
        }
+       
+       // 
------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
+
+       @ReduceOperator.Combinable
+       public static class MockCombiningReduceStub extends 
RichGroupReduceFunction<Record, Record> {
+               private static final long serialVersionUID = 1L;
+
+               private final IntValue theInteger = new IntValue();
+
+               @Override
+               public void reduce(Iterable<Record> records, Collector<Record> 
out) {
+                       Record element = null;
+                       int sum = 0;
+
+                       for (Record next : records) {
+                               element = next;
+                               element.getField(1, this.theInteger);
+
+                               sum += this.theInteger.getValue();
+                       }
+                       this.theInteger.setValue(sum);
+                       element.setField(1, this.theInteger);
+                       out.collect(element);
+               }
+
+               @Override
+               public void combine(Iterable<Record> records, Collector<Record> 
out) throws Exception {
+                       reduce(records, out);
+               }
+       }
+
+       @ReduceOperator.Combinable
+       public static final class MockFailingCombiningReduceStub extends 
RichGroupReduceFunction<Record, Record> {
+               private static final long serialVersionUID = 1L;
+
+               private int cnt = 0;
+
+               private final IntValue key = new IntValue();
+               private final IntValue value = new IntValue();
+               private final IntValue combineValue = new IntValue();
+
+               @Override
+               public void reduce(Iterable<Record> records, Collector<Record> 
out) {
+                       Record element = null;
+                       int sum = 0;
+
+                       for (Record next : records) {
+                               element = next;
+                               element.getField(1, this.value);
+
+                               sum += this.value.getValue();
+                       }
+                       element.getField(0, this.key);
+                       this.value.setValue(sum - this.key.getValue());
+                       element.setField(1, this.value);
+                       out.collect(element);
+               }
+
+               @Override
+               public void combine(Iterable<Record> records, Collector<Record> 
out) {
+                       Record element = null;
+                       int sum = 0;
+
+                       for (Record next : records) {
+                               element = next;
+                               element.getField(1, this.combineValue);
+
+                               sum += this.combineValue.getValue();
+                       }
+
+                       if (++this.cnt >= 10) {
+                               throw new ExpectedTestException();
+                       }
+
+                       this.combineValue.setValue(sum);
+                       element.setField(1, this.combineValue);
+                       out.collect(element);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 7772151..932e746 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -18,254 +18,244 @@
 
 package org.apache.flink.runtime.operators;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.operators.testutils.*;
-import org.junit.Assert;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.operators.testutils.DelayingIterator;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.InfiniteIntTupleIterator;
+import org.apache.flink.runtime.operators.testutils.UnaryOperatorTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
+
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
+
 import org.junit.Test;
 
-public class CombineTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Record, ?>>
-{
+import java.util.ArrayList;
+
+import static org.junit.Assert.*;
+
+public class CombineTaskTest
+               extends 
UnaryOperatorTestBase<RichGroupReduceFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>>, 
+               Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+       
        private static final long COMBINE_MEM = 3 * 1024 * 1024;
 
        private final double combine_frac;
        
-       private final ArrayList<Record> outList = new ArrayList<Record>();
+       private final ArrayList<Tuple2<Integer, Integer>> outList = new 
ArrayList<Tuple2<Integer, Integer>>();
+
+       private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new 
TupleSerializer<Tuple2<Integer, Integer>>(
+                       (Class<Tuple2<Integer, Integer>>) (Class<?>) 
Tuple2.class,
+                       new TypeSerializer<?>[] { IntSerializer.INSTANCE, 
IntSerializer.INSTANCE });
        
-       @SuppressWarnings("unchecked")
-       private final RecordComparator comparator = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+       private final TypeComparator<Tuple2<Integer, Integer>> comparator = new 
TupleComparator<Tuple2<Integer, Integer>>(
+                       new int[]{0},
+                       new TypeComparator<?>[] { new IntComparator(true) },
+                       new TypeSerializer<?>[] { IntSerializer.INSTANCE });
 
+       
        public CombineTaskTest(ExecutionConfig config) {
                super(config, COMBINE_MEM, 0);
 
-               combine_frac = 
(double)COMBINE_MEM/this.getMemoryManager().getMemorySize();
+               combine_frac = (double)COMBINE_MEM / 
this.getMemoryManager().getMemorySize();
        }
        
+       
        @Test
        public void testCombineTask() {
-               int keyCnt = 100;
-               int valCnt = 20;
-               
-               addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-               addDriverComparator(this.comparator);
-               addDriverComparator(this.comparator);
-               setOutput(this.outList);
-
-               
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
-               getTaskConfig().setRelativeMemoryDriver(combine_frac);
-               getTaskConfig().setFilehandlesDriver(2);
-               
-               final GroupReduceCombineDriver<Record, Record> testTask = new 
GroupReduceCombineDriver<Record, Record>();
-               
                try {
+                       int keyCnt = 100;
+                       int valCnt = 20;
+                       
+                       setInput(new UniformIntTupleGenerator(keyCnt, valCnt, 
false), serializer);
+                       addDriverComparator(this.comparator);
+                       addDriverComparator(this.comparator);
+                       setOutput(this.outList, serializer);
+       
+                       
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+                       getTaskConfig().setRelativeMemoryDriver(combine_frac);
+                       getTaskConfig().setFilehandlesDriver(2);
+                       
+                       final GroupReduceCombineDriver<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>> testTask =
+                                       new 
GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>();
+                       
                        testDriver(testTask, MockCombiningReduceStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Invoke method caused exception.");
-               }
-               
-               int expSum = 0;
-               for (int i = 1;i < valCnt; i++) {
-                       expSum += i;
-               }
-               
-               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+keyCnt, this.outList.size() == keyCnt);
-               
-               for(Record record : this.outList) {
-                       Assert.assertTrue("Incorrect result", 
record.getField(1, IntValue.class).getValue() == expSum);
+                       
+                       int expSum = 0;
+                       for (int i = 1;i < valCnt; i++) {
+                               expSum += i;
+                       }
+                       
+                       assertTrue(this.outList.size() == keyCnt);
+                       
+                       for (Tuple2<Integer, Integer> record : this.outList) {
+                               assertTrue(record.f1 == expSum);
+                       }
+                       
+                       this.outList.clear();
                }
-               
-               this.outList.clear();
-       }
-
-       @Test
-       public void testOversizedRecordCombineTask() {
-               int tenMil = 10000000;
-               Generator g = new Generator(561349061987311L, 1, tenMil);
-               //generate 10 records each of size 10MB
-               final TestData.GeneratorIterator gi = new 
TestData.GeneratorIterator(g, 10);
-               List<MutableObjectIterator<Record>> inputs = new 
ArrayList<MutableObjectIterator<Record>>();
-               inputs.add(gi);
-
-               addInput(new UnionIterator<Record>(inputs));
-               addDriverComparator(this.comparator);
-               addDriverComparator(this.comparator);
-               setOutput(this.outList);
-
-               
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
-               getTaskConfig().setRelativeMemoryDriver(combine_frac);
-               getTaskConfig().setFilehandlesDriver(2);
-
-               final GroupReduceCombineDriver<Record, Record> testTask = new 
GroupReduceCombineDriver<Record, Record>();
-
-               try {
-                       testDriver(testTask, MockCombiningReduceStub.class);
-               } catch (Exception e) {
+               catch (Exception e) {
                        e.printStackTrace();
-                       Assert.fail("Invoke method caused exception.");
+                       fail(e.getMessage());
                }
-
-               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+10, this.outList.size() == 10);
-
-               this.outList.clear();
        }
 
        @Test
        public void testFailingCombineTask() {
-               int keyCnt = 100;
-               int valCnt = 20;
-               
-               addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-               addDriverComparator(this.comparator);
-               addDriverComparator(this.comparator);
-               setOutput(new DiscardingOutputCollector<Record>());
-               
-               
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
-               getTaskConfig().setRelativeMemoryDriver(combine_frac);
-               getTaskConfig().setFilehandlesDriver(2);
-               
-               final GroupReduceCombineDriver<Record, Record> testTask = new 
GroupReduceCombineDriver<Record, Record>();
-               
                try {
-                       testDriver(testTask, 
MockFailingCombiningReduceStub.class);
-                       Assert.fail("Exception not forwarded.");
-               } catch (ExpectedTestException etex) {
-                       // good!
-               } catch (Exception e) {
+                       int keyCnt = 100;
+                       int valCnt = 20;
+                       
+                       setInput(new UniformIntTupleGenerator(keyCnt, valCnt, 
false), serializer);
+                       addDriverComparator(this.comparator);
+                       addDriverComparator(this.comparator);
+                       setOutput(new DiscardingOutputCollector<Tuple2<Integer, 
Integer>>());
+                       
+                       
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+                       getTaskConfig().setRelativeMemoryDriver(combine_frac);
+                       getTaskConfig().setFilehandlesDriver(2);
+                       
+                       final GroupReduceCombineDriver<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>> testTask = 
+                                       new 
GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>();
+                       
+                       try {
+                               testDriver(testTask, 
MockFailingCombiningReduceStub.class);
+                               fail("Exception not forwarded.");
+                       }
+                       catch (ExpectedTestException etex) {
+                               // good!
+                       }
+               }
+               catch (Exception e) {
                        e.printStackTrace();
-                       Assert.fail("Test failed due to an exception.");
+                       fail(e.getMessage());
                }
        }
 
        @Test
-       public void testCancelCombineTaskSorting()
-       {
-               addInput(new DelayingInfinitiveInputIterator(100));
-               addDriverComparator(this.comparator);
-               addDriverComparator(this.comparator);
-               setOutput(new DiscardingOutputCollector<Record>());
-               
-               
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
-               getTaskConfig().setRelativeMemoryDriver(combine_frac);
-               getTaskConfig().setFilehandlesDriver(2);
-               
-               final GroupReduceCombineDriver<Record, Record> testTask = new 
GroupReduceCombineDriver<Record, Record>();
-               
-               final AtomicBoolean success = new AtomicBoolean(false);
-               
-               Thread taskRunner = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       testDriver(testTask, 
MockFailingCombiningReduceStub.class);
-                                       success.set(true);
-                               } catch (Exception ie) {
-                                       ie.printStackTrace();
-                               }
-                       }
-               };
-               taskRunner.start();
-               
-               TaskCancelThread tct = new TaskCancelThread(1, taskRunner, 
this);
-               tct.start();
-               
+       public void testCancelCombineTaskSorting()  {
                try {
-                       tct.join();
-                       taskRunner.join();              
-               } catch(InterruptedException ie) {
-                       Assert.fail("Joining threads failed");
+                       MutableObjectIterator<Tuple2<Integer, Integer>> 
slowInfiniteInput =
+                                       new DelayingIterator<Tuple2<Integer, 
Integer>>(new InfiniteIntTupleIterator(), 1);
+                       
+                       setInput(slowInfiniteInput, serializer);
+                       addDriverComparator(this.comparator);
+                       addDriverComparator(this.comparator);
+                       setOutput(new DiscardingOutputCollector<Tuple2<Integer, 
Integer>>());
+                       
+                       
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+                       getTaskConfig().setRelativeMemoryDriver(combine_frac);
+                       getTaskConfig().setFilehandlesDriver(2);
+                       
+                       final GroupReduceCombineDriver<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>> testTask = 
+                                       new 
GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>();
+                       
+                       Thread taskRunner = new Thread() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               testDriver(testTask, 
MockFailingCombiningReduceStub.class);
+                                       }
+                                       catch (Exception e) {
+                                               // exceptions may happen during 
canceling
+                                       }
+                               }
+                       };
+                       taskRunner.start();
+                       
+                       // give the task some time
+                       Thread.sleep(500);
+                       
+                       // cancel
+                       testTask.cancel();
+                       
+                       // make sure it reacts to the canceling in some time
+                       taskRunner.join(5000);
+                       
+                       assertFalse("Task did not cancel properly within in 5 
seconds.", taskRunner.isAlive());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
                }
-               
-               Assert.assertTrue("Exception was thrown despite proper 
canceling.", success.get());
        }
        
-       @Combinable
-       public static class MockCombiningReduceStub extends 
RichGroupReduceFunction<Record, Record> {
+       // 
------------------------------------------------------------------------
+       //  Test Combiners
+       // 
------------------------------------------------------------------------
+       
+       @RichGroupReduceFunction.Combinable
+       public static class MockCombiningReduceStub extends 
+                       RichGroupReduceFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>>
+       {
                private static final long serialVersionUID = 1L;
-               
-               private final IntValue theInteger = new IntValue();
 
                @Override
-               public void reduce(Iterable<Record> records, Collector<Record> 
out) {
-                       Record element = null;
+               public void reduce(Iterable<Tuple2<Integer, Integer>> records, 
Collector<Tuple2<Integer, Integer>> out) {
+                       int key = 0;
                        int sum = 0;
-                       
-                       for (Record next : records) {
-                               element = next;
-                               element.getField(1, this.theInteger);
-                               
-                               sum += this.theInteger.getValue();
+
+                       for (Tuple2<Integer, Integer> next : records) {
+                               key = next.f0;
+                               sum += next.f1;
                        }
-                       this.theInteger.setValue(sum);
-                       element.setField(1, this.theInteger);
-                       out.collect(element);
+                       
+                       out.collect(new Tuple2<Integer, Integer>(key, sum));
                }
                
                @Override
-               public void combine(Iterable<Record> records, Collector<Record> 
out) throws Exception {
+               public void combine(Iterable<Tuple2<Integer, Integer>> records, 
Collector<Tuple2<Integer, Integer>> out) {
                        reduce(records, out);
                }
        }
        
-       @Combinable
-       public static final class MockFailingCombiningReduceStub extends 
RichGroupReduceFunction<Record, Record> {
+       @RichGroupReduceFunction.Combinable
+       public static final class MockFailingCombiningReduceStub extends 
+                       RichGroupReduceFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>>
+       {
                private static final long serialVersionUID = 1L;
                
-               private int cnt = 0;
-               
-               private final IntValue key = new IntValue();
-               private final IntValue value = new IntValue();
-               private final IntValue combineValue = new IntValue();
+               private int cnt;
 
                @Override
-               public void reduce(Iterable<Record> records, Collector<Record> 
out) {
-                       Record element = null;
+               public void reduce(Iterable<Tuple2<Integer, Integer>> records, 
Collector<Tuple2<Integer, Integer>> out) {
+                       int key = 0;
                        int sum = 0;
                        
-                       for (Record next : records) {
-                               element = next;
-                               element.getField(1, this.value);
-                               
-                               sum += this.value.getValue();
+                       for (Tuple2<Integer, Integer> next : records) {
+                               key = next.f0;
+                               sum += next.f1;
                        }
-                       element.getField(0, this.key);
-                       this.value.setValue(sum - this.key.getValue());
-                       element.setField(1, this.value);
-                       out.collect(element);
+                       
+                       int resultValue = sum - key;
+                       out.collect(new Tuple2<Integer, Integer>(key, 
resultValue));
                }
                
                @Override
-               public void combine(Iterable<Record> records, Collector<Record> 
out) {
-                       Record element = null;
+               public void combine(Iterable<Tuple2<Integer, Integer>> records, 
Collector<Tuple2<Integer, Integer>> out) {
+                       int key = 0;
                        int sum = 0;
-                       
-                       for (Record next : records) {
-                               element = next;
-                               element.getField(1, this.combineValue);
-                               
-                               sum += this.combineValue.getValue();
+
+                       for (Tuple2<Integer, Integer> next : records) {
+                               key = next.f0;
+                               sum += next.f1;
                        }
                        
                        if (++this.cnt >= 10) {
                                throw new ExpectedTestException();
                        }
-                       
-                       this.combineValue.setValue(sum);
-                       element.setField(1, this.combineValue);
-                       out.collect(element);
+
+                       int resultValue = sum - key;
+                       out.collect(new Tuple2<Integer, Integer>(key, 
resultValue));
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java
new file mode 100644
index 0000000..58d1676
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.operators.testutils.UnaryOperatorTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
+import org.apache.flink.runtime.operators.testutils.UnionIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test that checks how the combiner handles very large records that are too 
large to be written into
+ * a fresh sort buffer.
+ */
+public class CombinerOversizedRecordsTest
+               extends 
UnaryOperatorTestBase<GroupCombineFunction<Tuple3<Integer, Integer, String>, 
Tuple3<Integer, Double, String>>,
+               Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, 
String>> {
+
+       private static final long COMBINE_MEM = 3 * 1024 * 1024;
+
+       private final double combine_frac;
+
+       private final ArrayList<Tuple3<Integer, Double, String>> outList = new 
ArrayList<Tuple3<Integer, Double, String>>();
+
+       private final TypeSerializer<Tuple3<Integer, Integer, String>> 
serializer = 
+                       new TupleSerializer<Tuple3<Integer, Integer, String>>(
+                               (Class<Tuple3<Integer, Integer, String>>) 
(Class<?>) Tuple3.class,
+                               new TypeSerializer<?>[] { 
IntSerializer.INSTANCE, IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+
+       private final TypeSerializer<Tuple3<Integer, Double, String>> 
outSerializer = 
+                       new TupleSerializer<Tuple3<Integer, Double, String>>(
+                                       (Class<Tuple3<Integer, Double, 
String>>) (Class<?>) Tuple3.class,
+                                       new TypeSerializer<?>[] { 
IntSerializer.INSTANCE, DoubleSerializer.INSTANCE, StringSerializer.INSTANCE });
+
+       private final TypeComparator<Tuple3<Integer, Integer, String>> 
comparator = 
+                       new TupleComparator<Tuple3<Integer, Integer, String>>(
+                               new int[] { 0 },
+                               new TypeComparator<?>[] { new 
IntComparator(true) },
+                               new TypeSerializer<?>[] { 
IntSerializer.INSTANCE });
+       
+       // 
------------------------------------------------------------------------
+       
+       public CombinerOversizedRecordsTest(ExecutionConfig config) {
+               super(config, COMBINE_MEM, 0);
+               combine_frac = (double)COMBINE_MEM / 
getMemoryManager().getMemorySize();
+       }
+
+       @Test
+       public void testOversizedRecordCombineTask() {
+               try {
+                       final int keyCnt = 100;
+                       final int valCnt = 20;
+                       
+                       // create a long heavy string payload
+                       StringBuilder bld = new StringBuilder(10 * 1024 * 1024);
+                       Random rnd = new Random();
+                       
+                       for (int i = 0; i < 10000000; i++) {
+                               bld.append((char) (rnd.nextInt(26) + 'a'));
+                       }
+                       
+                       String longString = bld.toString();
+                       bld = null;
+
+                       // construct the input as a union of
+                       // 1) long string
+                       // 2) some random values
+                       // 3) long string
+                       // 4) random values
+                       // 5) long string
+                       
+                       // random values 1
+                       MutableObjectIterator<Tuple2<Integer, Integer>> gen1 = 
+                               new UniformIntTupleGenerator(keyCnt, valCnt, 
false);
+
+                       // random values 2
+                       MutableObjectIterator<Tuple2<Integer, Integer>> gen2 =
+                                       new UniformIntTupleGenerator(keyCnt, 
valCnt, false);
+
+                       @SuppressWarnings("unchecked")
+                       MutableObjectIterator<Tuple3<Integer, Integer, String>> 
input = 
+                                       new UnionIterator<Tuple3<Integer, 
Integer, String>>(
+                                                       new 
SingleValueIterator<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, 
Integer, String>(-1, -1, longString)),
+                                                       new 
StringIteratorDecorator(gen1),
+                                                       new 
SingleValueIterator<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, 
Integer, String>(-1, -1, longString)),
+                                                       new 
StringIteratorDecorator(gen2),
+                                                       new 
SingleValueIterator<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, 
Integer, String>(-1, -1, longString)));
+                       
+                       setInput(input, serializer);
+                       addDriverComparator(this.comparator);
+                       addDriverComparator(this.comparator);
+                       setOutput(this.outList, this.outSerializer);
+       
+                       
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+                       getTaskConfig().setRelativeMemoryDriver(combine_frac);
+                       getTaskConfig().setFilehandlesDriver(2);
+       
+                       GroupReduceCombineDriver<Tuple3<Integer, Integer, 
String>, Tuple3<Integer, Double, String>> testTask = 
+                                       new 
GroupReduceCombineDriver<Tuple3<Integer, Integer, String>, Tuple3<Integer, 
Double, String>>();
+                       
+                       testDriver(testTask, TestCombiner.class);
+
+                       assertEquals(3, testTask.getOversizedRecordCount());
+                       assertTrue(keyCnt + 3 == outList.size() || 2*keyCnt + 3 
== outList.size());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       public static final class TestCombiner 
+                       implements GroupCombineFunction<Tuple3<Integer, 
Integer, String>, Tuple3<Integer, Double, String>>
+       {
+               private static final long serialVersionUID = 1L;
+               
+
+               @Override
+               public void combine(Iterable<Tuple3<Integer, Integer, String>> 
values,
+                                                       
Collector<Tuple3<Integer, Double, String>> out)
+               {
+                       int key = 0;
+                       int sum = 0;
+                       String someString = null;
+
+                       for (Tuple3<Integer, Integer, String> next : values) {
+                               key = next.f0;
+                               sum += next.f1;
+                               someString = next.f2;
+                       }
+
+                       out.collect(new Tuple3<Integer, Double, String>(key, 
(double) sum, someString));
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       private static class StringIteratorDecorator implements 
MutableObjectIterator<Tuple3<Integer, Integer, String>> {
+
+               private final MutableObjectIterator<Tuple2<Integer, Integer>> 
input;
+
+               private 
StringIteratorDecorator(MutableObjectIterator<Tuple2<Integer, Integer>> input) {
+                       this.input = input;
+               }
+
+               @Override
+               public Tuple3<Integer, Integer, String> next(Tuple3<Integer, 
Integer, String> reuse) throws IOException {
+                       Tuple2<Integer, Integer> next = input.next();
+                       if (next == null) {
+                               return null;
+                       }
+                       else {
+                               reuse.f0 = next.f0;
+                               reuse.f1 = next.f1;
+                               reuse.f2 = "test string";
+                               return reuse;
+                       }
+               }
+
+               @Override
+               public Tuple3<Integer, Integer, String> next() throws 
IOException {
+                       Tuple2<Integer, Integer> next = input.next();
+                       if (next == null) {
+                               return null;
+                       }
+                       else {
+                               return new Tuple3<Integer, Integer, 
String>(next.f0, next.f1, "test string");
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class SingleValueIterator<T> implements 
MutableObjectIterator<T> {
+               
+               private final T value;
+               
+               private boolean pending = true;
+
+               private SingleValueIterator(T value) {
+                       this.value = value;
+               }
+
+               @Override
+               public T next(T reuse) {
+                       return next();
+               }
+
+               @Override
+               public T next() {
+                       if (pending) {
+                               pending = false;
+                               return value;
+                       } else {
+                               return null;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java
new file mode 100644
index 0000000..b3d53c7
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+
+public class DelayingIterator<T> implements MutableObjectIterator<T> {
+
+       private final MutableObjectIterator<T> iterator;
+       private final int delay;
+       
+       
+       public DelayingIterator(MutableObjectIterator<T> iterator, int delay) {
+               this.iterator = iterator;
+               this.delay = delay;
+       }
+       
+       @Override
+       public T next(T reuse) throws IOException {
+               try {
+                       Thread.sleep(delay);
+               }
+               catch (InterruptedException e) {
+                       // ignore, but restore interrupted state
+                       Thread.currentThread().interrupt();
+               }
+               return iterator.next(reuse);
+       }
+
+       @Override
+       public T next() throws IOException {
+               try {
+                       Thread.sleep(delay);
+               }
+               catch (InterruptedException e) {
+                       // ignore, but restore interrupted state
+                       Thread.currentThread().interrupt();
+               }
+               return iterator.next();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java
new file mode 100644
index 0000000..ba2181b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * A simple iterator that returns an infinite amount of (0, 0) tuples.
+ */
+public class InfiniteIntTupleIterator implements 
MutableObjectIterator<Tuple2<Integer, Integer>> {
+       
+       @Override
+       public Tuple2<Integer, Integer> next(Tuple2<Integer, Integer> reuse) {
+               return next();
+       }
+
+       @Override
+       public Tuple2<Integer, Integer> next() {
+               return new Tuple2<Integer, Integer>(0, 0);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
index 400e798..fd34a3b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
@@ -226,7 +226,7 @@ public final class TestData {
                                length = valueLength - 
random.nextInt(valueLength / 3);
                        }
 
-                       StringBuffer sb = new StringBuffer();
+                       StringBuilder sb = new StringBuilder();
                        for (int i = 0; i < length; i++) {
                                sb.append(alpha[random.nextInt(alpha.length)]);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
new file mode 100644
index 0000000..1e25bab
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.PactDriver;
+import org.apache.flink.runtime.operators.PactTaskContext;
+import org.apache.flink.runtime.operators.ResettablePactDriver;
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements 
PactTaskContext<S, OUT> {
+       
+       protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024;
+       
+       protected static final int PAGE_SIZE = 32 * 1024; 
+       
+       private final IOManager ioManager;
+       
+       private final MemoryManager memManager;
+       
+       private MutableObjectIterator<IN> input;
+       
+       private TypeSerializer<IN> inputSerializer;
+       
+       private List<TypeComparator<IN>> comparators;
+       
+       private UnilateralSortMerger<IN> sorter;
+       
+       private final AbstractInvokable owner;
+
+       private final TaskConfig taskConfig;
+       
+       protected final long perSortMem;
+
+       protected final double perSortFractionMem;
+       
+       private Collector<OUT> output;
+       
+       protected int numFileHandles;
+       
+       private S stub;
+       
+       private PactDriver<S, OUT> driver;
+       
+       private volatile boolean running;
+
+       private ExecutionConfig executionConfig;
+       
+       protected UnaryOperatorTestBase(ExecutionConfig executionConfig, long 
memory, int maxNumSorters) {
+               this(executionConfig, memory, maxNumSorters, 
DEFAULT_PER_SORT_MEM);
+       }
+       
+       protected UnaryOperatorTestBase(ExecutionConfig executionConfig, long 
memory, int maxNumSorters, long perSortMemory) {
+               if (memory < 0 || maxNumSorters < 0 || perSortMemory < 0) {
+                       throw new IllegalArgumentException();
+               }
+               
+               final long totalMem = Math.max(memory, 0) + 
(Math.max(maxNumSorters, 0) * perSortMemory);
+               
+               this.perSortMem = perSortMemory;
+               this.perSortFractionMem = (double)perSortMemory/totalMem;
+               this.ioManager = new IOManagerAsync();
+               this.memManager = totalMem > 0 ? new 
DefaultMemoryManager(totalMem,1) : null;
+               this.owner = new DummyInvokable();
+
+               Configuration config = new Configuration();
+               this.taskConfig = new TaskConfig(config);
+
+               this.executionConfig = executionConfig;
+               this.comparators = new ArrayList<TypeComparator<IN>>(2);
+       }
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> getConfigurations() {
+               ExecutionConfig withReuse = new ExecutionConfig();
+               withReuse.enableObjectReuse();
+
+               ExecutionConfig withoutReuse = new ExecutionConfig();
+               withoutReuse.disableObjectReuse();
+
+               Object[] a = { withoutReuse };
+               Object[] b = { withReuse };
+               return Arrays.asList(a, b);
+       }
+
+       public void setInput(MutableObjectIterator<IN> input, 
TypeSerializer<IN> serializer) {
+               this.input = input;
+               this.inputSerializer = serializer;
+               this.sorter = null;
+       }
+       
+       public void addInputSorted(MutableObjectIterator<IN> input,
+                                                               
TypeSerializer<IN> serializer,
+                                                               
TypeComparator<IN> comp) throws Exception
+       {
+               this.input = null;
+               this.inputSerializer = serializer;
+               this.sorter = new UnilateralSortMerger<IN>(
+                               this.memManager, this.ioManager, input, 
this.owner,
+                               this.<IN>getInputSerializer(0),
+                               comp,
+                               this.perSortFractionMem, 32, 0.8f);
+       }
+       
+       public void addDriverComparator(TypeComparator<IN> comparator) {
+               this.comparators.add(comparator);
+       }
+
+       public void setOutput(Collector<OUT> output) {
+               this.output = output;
+       }
+       public void setOutput(List<OUT> output, TypeSerializer<OUT> 
outSerializer) {
+               this.output = new ListOutputCollector<OUT>(output, 
outSerializer);
+       }
+       
+       public int getNumFileHandlesForSort() {
+               return numFileHandles;
+       }
+
+       
+       public void setNumFileHandlesForSort(int numFileHandles) {
+               this.numFileHandles = numFileHandles;
+       }
+
+       @SuppressWarnings("rawtypes")
+       public void testDriver(PactDriver driver, Class stubClass) throws 
Exception {
+               testDriverInternal(driver, stubClass);
+       }
+
+       @SuppressWarnings({"unchecked","rawtypes"})
+       public void testDriverInternal(PactDriver driver, Class stubClass) 
throws Exception {
+
+               this.driver = driver;
+               driver.setup(this);
+
+               this.stub = (S)stubClass.newInstance();
+
+               // regular running logic
+               this.running = true;
+               boolean stubOpen = false;
+
+               try {
+                       // run the data preparation
+                       try {
+                               driver.prepare();
+                       }
+                       catch (Throwable t) {
+                               throw new Exception("The data preparation 
caused an error: " + t.getMessage(), t);
+                       }
+
+                       // open stub implementation
+                       try {
+                               FunctionUtils.openFunction(this.stub, 
getTaskConfig().getStubParameters());
+                               stubOpen = true;
+                       }
+                       catch (Throwable t) {
+                               throw new Exception("The user defined 'open()' 
method caused an exception: " + t.getMessage(), t);
+                       }
+
+                       // run the user code
+                       driver.run();
+
+                       // close. We close here such that a regular close 
throwing an exception marks a task as failed.
+                       if (this.running) {
+                               FunctionUtils.closeFunction (this.stub);
+                               stubOpen = false;
+                       }
+
+                       this.output.close();
+               }
+               catch (Exception ex) {
+                       // close the input, but do not report any exceptions, 
since we already have another root cause
+                       if (stubOpen) {
+                               try {
+                                       FunctionUtils.closeFunction(this.stub);
+                               }
+                               catch (Throwable t) {
+                                       // ignore
+                               }
+                       }
+
+                       // if resettable driver invoke tear-down
+                       if (this.driver instanceof ResettablePactDriver) {
+                               final ResettablePactDriver<?, ?> resDriver = 
(ResettablePactDriver<?, ?>) this.driver;
+                               try {
+                                       resDriver.teardown();
+                               } catch (Throwable t) {
+                                       throw new Exception("Error while 
shutting down an iterative operator: " + t.getMessage(), t);
+                               }
+                       }
+
+                       // drop exception, if the task was canceled
+                       if (this.running) {
+                               throw ex;
+                       }
+
+               }
+               finally {
+                       driver.cleanup();
+               }
+       }
+
+       @SuppressWarnings({"unchecked","rawtypes"})
+       public void testResettableDriver(ResettablePactDriver driver, Class 
stubClass, int iterations) throws Exception {
+               driver.setup(this);
+               
+               for (int i = 0; i < iterations; i++) {
+                       if (i == 0) {
+                               driver.initialize();
+                       }
+                       else {
+                               driver.reset();
+                       }
+                       testDriver(driver, stubClass);
+               }
+               
+               driver.teardown();
+       }
+       
+       public void cancel() throws Exception {
+               this.running = false;
+               this.driver.cancel();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public TaskConfig getTaskConfig() {
+               return this.taskConfig;
+       }
+
+       @Override
+       public ExecutionConfig getExecutionConfig() {
+               return executionConfig;
+       }
+       
+       @Override
+       public ClassLoader getUserCodeClassLoader() {
+               return getClass().getClassLoader();
+       }
+
+       @Override
+       public IOManager getIOManager() {
+               return this.ioManager;
+       }
+       
+       @Override
+       public MemoryManager getMemoryManager() {
+               return this.memManager;
+       }
+
+       @Override
+       public <X> MutableObjectIterator<X> getInput(int index) {
+               MutableObjectIterator<IN> in = this.input;
+               if (in == null) {
+                       // waiting from sorter
+                       try {
+                               in = this.sorter.getIterator();
+                       }
+                       catch (InterruptedException e) {
+                               throw new RuntimeException("Interrupted");
+                       }
+                       this.input = in;
+               }
+               
+               @SuppressWarnings("unchecked")
+               MutableObjectIterator<X> input = (MutableObjectIterator<X>) 
this.input;
+               return input;
+       }
+
+       @Override
+       public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
+               if (index != 0) {
+                       throw new IllegalArgumentException();
+               }
+               
+               @SuppressWarnings("unchecked")
+               TypeSerializer<X> ser = (TypeSerializer<X>) inputSerializer;
+               return new RuntimeSerializerFactory<X>(ser, (Class<X>) 
ser.createInstance().getClass());
+       }
+
+       @Override
+       public <X> TypeComparator<X> getDriverComparator(int index) {
+               @SuppressWarnings("unchecked")
+               TypeComparator<X> comparator = (TypeComparator<X>) 
this.comparators.get(index);
+               return comparator;
+       }
+
+       @Override
+       public S getStub() {
+               return this.stub;
+       }
+
+       @Override
+       public Collector<OUT> getOutputCollector() {
+               return this.output;
+       }
+
+       @Override
+       public AbstractInvokable getOwningNepheleTask() {
+               return this.owner;
+       }
+
+       @Override
+       public String formatLogString(String message) {
+               return "Driver Tester: " + message;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       @After
+       public void shutdownAll() throws Exception {
+               // 1st, shutdown sorters
+               if (this.sorter != null) {
+                       sorter.close();
+               }
+               
+               // 2nd, shutdown I/O
+               this.ioManager.shutdown();
+               Assert.assertTrue("I/O Manager has not properly shut down.", 
this.ioManager.isProperlyShutDown());
+
+               // last, verify all memory is returned and shutdown mem manager
+               MemoryManager memMan = getMemoryManager();
+               if (memMan != null) {
+                       Assert.assertTrue("Memory Manager managed memory was 
not completely freed.", memMan.verifyEmpty());
+                       memMan.shutdown();
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       private static final class ListOutputCollector<OUT> implements 
Collector<OUT> {
+               
+               private final List<OUT> output;
+               private final TypeSerializer<OUT> serializer;
+               
+               public ListOutputCollector(List<OUT> outputList, 
TypeSerializer<OUT> serializer) {
+                       this.output = outputList;
+                       this.serializer = serializer;
+               }
+               
+
+               @Override
+               public void collect(OUT record) {
+                       this.output.add(serializer.copy(record));
+               }
+
+               @Override
+               public void close() {}
+       }
+       
+       public static final class CountingOutputCollector<OUT> implements 
Collector<OUT> {
+               
+               private int num;
+
+               @Override
+               public void collect(OUT record) {
+                       this.num++;
+               }
+
+               @Override
+               public void close() {}
+               
+               public int getNumberOfRecords() {
+                       return this.num;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java
new file mode 100644
index 0000000..451cf9e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class UniformIntStringTupleGenerator implements 
MutableObjectIterator<Tuple2<Integer, String>> {
+
+       private final int numKeys;
+       private final int numVals;
+       
+       private int keyCnt;
+       private int valCnt;
+       
+       private boolean repeatKey;
+       
+       
+       public UniformIntStringTupleGenerator(int numKeys, int numVals, boolean 
repeatKey) {
+               this.numKeys = numKeys;
+               this.numVals = numVals;
+               this.repeatKey = repeatKey;
+       }
+       
+       @Override
+       public Tuple2<Integer, String> next(Tuple2<Integer, String> target) {
+               if (!repeatKey) {
+                       if(valCnt >= numVals) {
+                               return null;
+                       }
+                       
+                       target.f0 = keyCnt++;
+                       target.f1 = Integer.toBinaryString(valCnt);
+                       
+                       if(keyCnt == numKeys) {
+                               keyCnt = 0;
+                               valCnt++;
+                       }
+               }
+               else {
+                       if (keyCnt >= numKeys) {
+                               return null;
+                       }
+                       
+                       target.f0 = keyCnt;
+                       target.f1 = Integer.toBinaryString(valCnt++);
+                       
+                       if (valCnt == numVals) {
+                               valCnt = 0;
+                               keyCnt++;
+                       }
+               }
+               
+               return target;
+       }
+
+       @Override
+       public Tuple2<Integer, String> next() {
+               return next(new Tuple2<Integer, String>());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java
new file mode 100644
index 0000000..457b4ad
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class UniformIntTupleGenerator implements 
MutableObjectIterator<Tuple2<Integer, Integer>> {
+
+       private final int numKeys;
+       private final int numVals;
+       
+       private int keyCnt = 0;
+       private int valCnt = 0;
+       private boolean repeatKey;
+       
+       public UniformIntTupleGenerator(int numKeys, int numVals, boolean 
repeatKey) {
+               this.numKeys = numKeys;
+               this.numVals = numVals;
+               this.repeatKey = repeatKey;
+       }
+
+       @Override
+       public Tuple2<Integer, Integer> next(Tuple2<Integer, Integer> target) {
+               if (!repeatKey) {
+                       if(valCnt >= numVals) {
+                               return null;
+                       }
+                       
+                       target.f0 = keyCnt++;
+                       target.f1 = valCnt;
+                       
+                       if (keyCnt == numKeys) {
+                               keyCnt = 0;
+                               valCnt++;
+                       }
+               }
+               else {
+                       if (keyCnt >= numKeys) {
+                               return null;
+                       }
+                       
+                       target.f0 = keyCnt;
+                       target.f1 = valCnt++;
+                       
+                       if (valCnt == numVals) {
+                               valCnt = 0;
+                               keyCnt++;
+                       }
+               }
+               
+               return target;
+       }
+
+       @Override
+       public Tuple2<Integer, Integer> next() {
+               return next(new Tuple2<Integer, Integer>());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
index 3a76ebd..1127fca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
@@ -16,26 +16,30 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.testutils;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.util.MutableObjectIterator;
 
-
 /**
  * An iterator that returns the union of a given set of iterators.
  */
-public class UnionIterator<E> implements MutableObjectIterator<E>
-{
+public class UnionIterator<E> implements MutableObjectIterator<E> {
+       
        private MutableObjectIterator<E> currentSource;
        
        private List<MutableObjectIterator<E>> nextSources;
+
+
+       public UnionIterator(MutableObjectIterator<E>... iterators) {
+               this(new 
ArrayList<MutableObjectIterator<E>>(Arrays.asList(iterators)));
+       }
        
-       public UnionIterator(List<MutableObjectIterator<E>> sources)
-       {
+       public UnionIterator(List<MutableObjectIterator<E>> sources) {
                this.currentSource = sources.remove(0);
                this.nextSources = sources;
        }

Reply via email to