[FLINK-1982] [record-api] Remove dependencies on Record API from flink-runtime 
tests

Rename Match*Test to Join*Test and MapTaskTest to FlatMapTaskTest

This closes #1294


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c8a6588
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c8a6588
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c8a6588

Branch: refs/heads/master
Commit: 3c8a6588ac9bd35984d6e3b1eef916461a262fe4
Parents: 7ff071f
Author: Fabian Hueske <fhue...@apache.org>
Authored: Thu Oct 22 21:10:41 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Oct 23 13:01:37 2015 +0200

----------------------------------------------------------------------
 .../operators/CombineTaskExternalITCase.java    |   16 +-
 .../flink/runtime/operators/CrossTaskTest.java  |   41 +-
 .../runtime/operators/DataSinkTaskTest.java     |   52 +-
 .../runtime/operators/DataSourceTaskTest.java   |   16 +-
 .../runtime/operators/FlatMapTaskTest.java      |  149 +++
 .../operators/JoinTaskExternalITCase.java       |  165 +++
 .../flink/runtime/operators/JoinTaskTest.java   | 1017 +++++++++++++++++
 .../flink/runtime/operators/MapTaskTest.java    |  151 ---
 .../operators/MatchTaskExternalITCase.java      |  167 ---
 .../flink/runtime/operators/MatchTaskTest.java  | 1019 ------------------
 .../operators/ReduceTaskExternalITCase.java     |   16 +-
 .../flink/runtime/operators/ReduceTaskTest.java |   19 +-
 .../operators/chaining/ChainTaskTest.java       |   25 +-
 .../operators/testutils/TaskTestBase.java       |   18 +-
 14 files changed, 1428 insertions(+), 1443 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index 4905e57..800bca7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -22,11 +22,11 @@ import java.util.ArrayList;
 import java.util.HashMap;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -42,7 +42,7 @@ public class CombineTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFun
 
        private final double combine_frac;
        
-       private final ArrayList<Record> outList = new ArrayList<Record>();
+       private final ArrayList<Record> outList = new ArrayList<>();
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator = new RecordComparator(
@@ -69,7 +69,7 @@ public class CombineTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFun
                getTaskConfig().setRelativeMemoryDriver(combine_frac);
                getTaskConfig().setFilehandlesDriver(2);
                
-               final GroupReduceCombineDriver<Record, Record> testTask = new 
GroupReduceCombineDriver<Record, Record>();
+               final GroupReduceCombineDriver<Record, Record> testTask = new 
GroupReduceCombineDriver<>();
                
                try {
                        testDriver(testTask, MockCombiningReduceStub.class);
@@ -85,7 +85,7 @@ public class CombineTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFun
                
                // wee need to do the final aggregation manually in the test, 
because the
                // combiner is not guaranteed to do that
-               final HashMap<IntValue, IntValue> aggMap = new 
HashMap<IntValue, IntValue>();
+               final HashMap<IntValue, IntValue> aggMap = new HashMap<>();
                for (Record record : this.outList) {
                        IntValue key = new IntValue();
                        IntValue value = new IntValue();
@@ -123,7 +123,7 @@ public class CombineTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFun
                getTaskConfig().setRelativeMemoryDriver(combine_frac);
                getTaskConfig().setFilehandlesDriver(2);
                
-               final GroupReduceCombineDriver<Record, Record> testTask = new 
GroupReduceCombineDriver<Record, Record>();
+               final GroupReduceCombineDriver<Record, Record> testTask = new 
GroupReduceCombineDriver<>();
 
                try {
                        testDriver(testTask, MockCombiningReduceStub.class);
@@ -139,7 +139,7 @@ public class CombineTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFun
                
                // wee need to do the final aggregation manually in the test, 
because the
                // combiner is not guaranteed to do that
-               final HashMap<IntValue, IntValue> aggMap = new 
HashMap<IntValue, IntValue>();
+               final HashMap<IntValue, IntValue> aggMap = new HashMap<>();
                for (Record record : this.outList) {
                        IntValue key = new IntValue();
                        IntValue value = new IntValue();
@@ -166,7 +166,7 @@ public class CombineTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFun
        // 
------------------------------------------------------------------------
        // 
------------------------------------------------------------------------
 
-       @ReduceOperator.Combinable
+       @Combinable
        public static class MockCombiningReduceStub extends 
RichGroupReduceFunction<Record, Record> {
                private static final long serialVersionUID = 1L;
 
@@ -194,7 +194,7 @@ public class CombineTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFun
                }
        }
 
-       @ReduceOperator.Combinable
+       @Combinable
        public static final class MockFailingCombiningReduceStub extends 
RichGroupReduceFunction<Record, Record> {
                private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
index 4c27a68..2d8838a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
@@ -31,7 +31,6 @@ import 
org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.Record;
 import org.junit.Test;
 
-@SuppressWarnings("deprecation")
 public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, 
Record, Record>> {
        
        private static final long CROSS_MEM = 1024 * 1024;
@@ -65,7 +64,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                try {
                        testDriver(testTask, MockCrossStub.class);
@@ -95,7 +94,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                try {
                        testDriver(testTask, MockCrossStub.class);
@@ -123,7 +122,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                try {
                        testDriver(testTask, MockFailingCrossStub.class);
@@ -153,7 +152,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                try {
                        testDriver(testTask, MockFailingCrossStub.class);
@@ -184,7 +183,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                try {
                        testDriver(testTask, MockCrossStub.class);
@@ -215,7 +214,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                try {
                        testDriver(testTask, MockCrossStub.class);
@@ -243,7 +242,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                try {
                        testDriver(testTask, MockFailingCrossStub.class);
@@ -272,7 +271,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                try {
                        testDriver(testTask, MockFailingCrossStub.class);
@@ -303,7 +302,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                try {
                        testDriver(testTask, MockCrossStub.class);
@@ -333,7 +332,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                try {
                        testDriver(testTask, MockCrossStub.class);
@@ -363,7 +362,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                try {
                        testDriver(testTask, MockCrossStub.class);
@@ -393,7 +392,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                try {
                        testDriver(testTask, MockCrossStub.class);
@@ -420,7 +419,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                final AtomicBoolean success = new AtomicBoolean(false);
                
@@ -463,7 +462,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                final AtomicBoolean success = new AtomicBoolean(false);
                
@@ -485,7 +484,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
                try {
                        tct.join();
-                       taskRunner.join();              
+                       taskRunner.join();
                } catch(InterruptedException ie) {
                        Assert.fail("Joining threads failed");
                }
@@ -506,7 +505,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                final AtomicBoolean success = new AtomicBoolean(false);
                
@@ -549,7 +548,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
                getTaskConfig().setRelativeMemoryDriver(cross_frac);
                
-               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<Record, Record, Record>();
+               final CrossDriver<Record, Record, Record> testTask = new 
CrossDriver<>();
                
                final AtomicBoolean success = new AtomicBoolean(false);
                
@@ -571,7 +570,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                
                try {
                        tct.join();
-                       taskRunner.join();              
+                       taskRunner.join();
                } catch(InterruptedException ie) {
                        Assert.fail("Joining threads failed");
                }
@@ -579,7 +578,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                Assert.assertTrue("Exception was thrown despite proper 
canceling.", success.get());
        }
        
-       public static final class MockCrossStub extends 
org.apache.flink.api.java.record.functions.CrossFunction {
+       public static final class MockCrossStub implements 
CrossFunction<Record, Record, Record> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -588,7 +587,7 @@ public class CrossTaskTest extends 
DriverTestBase<CrossFunction<Record, Record,
                }
        }
        
-       public static final class MockFailingCrossStub extends 
org.apache.flink.api.java.record.functions.CrossFunction {
+       public static final class MockFailingCrossStub implements 
CrossFunction<Record, Record, Record> {
                private static final long serialVersionUID = 1L;
                
                private int cnt = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index e91d338..b741b64 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -30,7 +30,6 @@ import 
org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.junit.After;
 import org.junit.Assert;
@@ -81,7 +80,7 @@ public class DataSinkTaskTest extends TaskTestBase
                        super.initEnvironment(MEMORY_MANAGER_SIZE, 
NETWORK_BUFFER_SIZE);
                        super.addInput(new UniformRecordGenerator(keyCnt, 
valCnt, false), 0);
 
-                       DataSinkTask<Record> testTask = new 
DataSinkTask<Record>();
+                       DataSinkTask<Record> testTask = new DataSinkTask<>();
 
                        super.registerFileOutputTask(testTask, 
MockOutputFormat.class, new File(tempTestPath).toURI().toString());
 
@@ -94,7 +93,7 @@ public class DataSinkTaskTest extends TaskTestBase
                        fr = new FileReader(tempTestFile);
                        br = new BufferedReader(fr);
 
-                       HashMap<Integer, HashSet<Integer>> keyValueCountMap = 
new HashMap<Integer, HashSet<Integer>>(keyCnt);
+                       HashMap<Integer, HashSet<Integer>> keyValueCountMap = 
new HashMap<>(keyCnt);
 
                        while (br.ready()) {
                                String line = br.readLine();
@@ -144,7 +143,7 @@ public class DataSinkTaskTest extends TaskTestBase
                readers[2] = super.addInput(new UniformRecordGenerator(keyCnt, 
valCnt, keyCnt * 2, 0, false), 0, false);
                readers[3] = super.addInput(new UniformRecordGenerator(keyCnt, 
valCnt, keyCnt * 3, 0, false), 0, false);
 
-               DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+               DataSinkTask<Record> testTask = new DataSinkTask<>();
 
                super.registerFileOutputTask(testTask, MockOutputFormat.class, 
new File(tempTestPath).toURI().toString());
 
@@ -171,7 +170,7 @@ public class DataSinkTaskTest extends TaskTestBase
                        fr = new FileReader(tempTestFile);
                        br = new BufferedReader(fr);
 
-                       HashMap<Integer,HashSet<Integer>> keyValueCountMap = 
new HashMap<Integer, HashSet<Integer>>(keyCnt);
+                       HashMap<Integer,HashSet<Integer>> keyValueCountMap = 
new HashMap<>(keyCnt);
 
                        while(br.ready()) {
                                String line = br.readLine();
@@ -219,12 +218,12 @@ public class DataSinkTaskTest extends TaskTestBase
 
                super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 
true), 0);
 
-               DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+               DataSinkTask<Record> testTask = new DataSinkTask<>();
 
                // set sorting
                super.getTaskConfig().setInputLocalStrategy(0, 
LocalStrategy.SORT);
                super.getTaskConfig().setInputComparator(
-                               new RecordComparatorFactory(new 
int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})), 0);
+                               new RecordComparatorFactory(new int[]{1},(new 
Class[]{IntValue.class})), 0);
                super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
                super.getTaskConfig().setFilehandlesInput(0, 8);
                super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
@@ -248,7 +247,7 @@ public class DataSinkTaskTest extends TaskTestBase
                        fr = new FileReader(tempTestFile);
                        br = new BufferedReader(fr);
 
-                       Set<Integer> keys = new HashSet<Integer>();
+                       Set<Integer> keys = new HashSet<>();
 
                        int curVal = -1;
                        while(br.ready()) {
@@ -297,7 +296,7 @@ public class DataSinkTaskTest extends TaskTestBase
                super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
                super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 
false), 0);
 
-               DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+               DataSinkTask<Record> testTask = new DataSinkTask<>();
                Configuration stubParams = new Configuration();
                super.getTaskConfig().setStubParameters(stubParams);
 
@@ -323,21 +322,20 @@ public class DataSinkTaskTest extends TaskTestBase
        public void testFailingSortingDataSinkTask() {
 
                int keyCnt = 100;
-               int valCnt = 20;;
+               int valCnt = 20;
                double memoryFraction = 1.0;
 
                super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
                super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 
true), 0);
 
-               DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+               DataSinkTask<Record> testTask = new DataSinkTask<>();
                Configuration stubParams = new Configuration();
                super.getTaskConfig().setStubParameters(stubParams);
 
                // set sorting
                super.getTaskConfig().setInputLocalStrategy(0, 
LocalStrategy.SORT);
                super.getTaskConfig().setInputComparator(
-                               new RecordComparatorFactory(new int[]{1}, 
((Class<? extends Key<?>>[]) new Class[]{IntValue.class})),
-                               0);
+                               new RecordComparatorFactory(new int[]{1}, ( new 
Class[]{IntValue.class})), 0);
                super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
                super.getTaskConfig().setFilehandlesInput(0, 8);
                super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
@@ -365,7 +363,7 @@ public class DataSinkTaskTest extends TaskTestBase
                super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
                super.addInput(new InfiniteInputIterator(), 0);
 
-               final DataSinkTask<Record> testTask = new 
DataSinkTask<Record>();
+               final DataSinkTask<Record> testTask = new DataSinkTask<>();
                Configuration stubParams = new Configuration();
                super.getTaskConfig().setStubParameters(stubParams);
 
@@ -407,15 +405,14 @@ public class DataSinkTaskTest extends TaskTestBase
                super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
                super.addInput(new InfiniteInputIterator(), 0);
 
-               final DataSinkTask<Record> testTask = new 
DataSinkTask<Record>();
+               final DataSinkTask<Record> testTask = new DataSinkTask<>();
                Configuration stubParams = new Configuration();
                super.getTaskConfig().setStubParameters(stubParams);
 
                // set sorting
                super.getTaskConfig().setInputLocalStrategy(0, 
LocalStrategy.SORT);
                super.getTaskConfig().setInputComparator(
-                               new RecordComparatorFactory(new 
int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})),
-                               0);
+                               new RecordComparatorFactory(new int[]{1},(new 
Class[]{IntValue.class})), 0);
                super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
                super.getTaskConfig().setFilehandlesInput(0, 8);
                super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
@@ -447,7 +444,7 @@ public class DataSinkTaskTest extends TaskTestBase
 
        }
 
-       public static class MockOutputFormat extends DelimitedOutputFormat {
+       public static class MockOutputFormat extends FileOutputFormat<Record> {
                private static final long serialVersionUID = 1L;
 
                final StringBuilder bld = new StringBuilder();
@@ -458,8 +455,7 @@ public class DataSinkTaskTest extends TaskTestBase
                }
 
                @Override
-               public int serializeRecord(Record rec, byte[] target) throws 
Exception
-               {
+               public void writeRecord(Record rec) throws IOException {
                        IntValue key = rec.getField(0, IntValue.class);
                        IntValue value = rec.getField(1, IntValue.class);
 
@@ -467,14 +463,11 @@ public class DataSinkTaskTest extends TaskTestBase
                        this.bld.append(key.getValue());
                        this.bld.append('_');
                        this.bld.append(value.getValue());
+                       this.bld.append('\n');
 
                        byte[] bytes = this.bld.toString().getBytes();
-                       if (bytes.length <= target.length) {
-                               System.arraycopy(bytes, 0, target, 0, 
bytes.length);
-                               return bytes.length;
-                       }
-                       // else
-                       return -bytes.length;
+
+                       this.stream.write(bytes);
                }
 
        }
@@ -490,12 +483,11 @@ public class DataSinkTaskTest extends TaskTestBase
                }
 
                @Override
-               public int serializeRecord(Record rec, byte[] target) throws 
Exception
-               {
+               public void writeRecord(Record rec) throws IOException {
                        if (++this.cnt >= 10) {
                                throw new RuntimeException("Expected Test 
Exception");
                        }
-                       return super.serializeRecord(rec, target);
+                       super.writeRecord(rec);
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 4548410..96ae700 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -28,11 +28,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.junit.Assert;
 
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
 import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -83,7 +83,7 @@ public class DataSourceTaskTest extends TaskTestBase {
                super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
                super.addOutput(this.outList);
                
-               DataSourceTask<Record> testTask = new DataSourceTask<Record>();
+               DataSourceTask<Record> testTask = new DataSourceTask<>();
                
                super.registerFileInputTask(testTask, MockInputFormat.class, 
new File(tempTestPath).toURI().toString(), "\n");
                
@@ -97,7 +97,7 @@ public class DataSourceTaskTest extends TaskTestBase {
                Assert.assertTrue("Invalid output size. Expected: 
"+(keyCnt*valCnt)+" Actual: "+this.outList.size(),
                        this.outList.size() == keyCnt * valCnt);
                
-               HashMap<Integer,HashSet<Integer>> keyValueCountMap = new 
HashMap<Integer, HashSet<Integer>>(keyCnt);
+               HashMap<Integer,HashSet<Integer>> keyValueCountMap = new 
HashMap<>(keyCnt);
                
                for (Record kvp : this.outList) {
                        
@@ -138,7 +138,7 @@ public class DataSourceTaskTest extends TaskTestBase {
                super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
                super.addOutput(this.outList);
                
-               DataSourceTask<Record> testTask = new DataSourceTask<Record>();
+               DataSourceTask<Record> testTask = new DataSourceTask<>();
 
                super.registerFileInputTask(testTask, 
MockFailingInputFormat.class, new File(tempTestPath).toURI().toString(), "\n");
                
@@ -172,7 +172,7 @@ public class DataSourceTaskTest extends TaskTestBase {
                        Assert.fail("Unable to set-up test input file");
                }
                
-               final DataSourceTask<Record> testTask = new 
DataSourceTask<Record>();
+               final DataSourceTask<Record> testTask = new DataSourceTask<>();
 
                super.registerFileInputTask(testTask, 
MockDelayingInputFormat.class,  new File(tempTestPath).toURI().toString(), 
"\n");
                
@@ -232,7 +232,7 @@ public class DataSourceTaskTest extends TaskTestBase {
                }
        }
        
-       public static class MockInputFormat extends DelimitedInputFormat {
+       public static class MockInputFormat extends 
DelimitedInputFormat<Record> {
                private static final long serialVersionUID = 1L;
                
                private final IntValue key = new IntValue();
@@ -257,7 +257,7 @@ public class DataSourceTaskTest extends TaskTestBase {
                }
        }
        
-       public static class MockDelayingInputFormat extends 
DelimitedInputFormat {
+       public static class MockDelayingInputFormat extends 
DelimitedInputFormat<Record> {
                private static final long serialVersionUID = 1L;
                
                private final IntValue key = new IntValue();
@@ -289,7 +289,7 @@ public class DataSourceTaskTest extends TaskTestBase {
                
        }
        
-       public static class MockFailingInputFormat extends DelimitedInputFormat 
{
+       public static class MockFailingInputFormat extends 
DelimitedInputFormat<Record> {
                private static final long serialVersionUID = 1L;
                
                private final IntValue key = new IntValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java
new file mode 100644
index 0000000..3d7f99e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DriverTestBase;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
+import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FlatMapTaskTest extends DriverTestBase<FlatMapFunction<Record, 
Record>> {
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlatMapTaskTest.class);
+       
+       private final CountingOutputCollector output = new 
CountingOutputCollector();
+       
+       
+       public FlatMapTaskTest(ExecutionConfig config) {
+               super(config, 0, 0);
+       }
+       
+       @Test
+       public void testMapTask() {
+               final int keyCnt = 100;
+               final int valCnt = 20;
+               
+               addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+               setOutput(this.output);
+               
+               final FlatMapDriver<Record, Record> testDriver = new 
FlatMapDriver<>();
+               
+               try {
+                       testDriver(testDriver, MockMapStub.class);
+               } catch (Exception e) {
+                       LOG.debug("Exception while running the test driver.", 
e);
+                       Assert.fail("Invoke method caused exception.");
+               }
+               
+               Assert.assertEquals("Wrong result set size.", keyCnt*valCnt, 
this.output.getNumberOfRecords());
+       }
+       
+       @Test
+       public void testFailingMapTask() {
+               final int keyCnt = 100;
+               final int valCnt = 20;
+               
+               addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+               setOutput(new DiscardingOutputCollector<Record>());
+               
+               final FlatMapDriver<Record, Record> testTask = new 
FlatMapDriver<>();
+               try {
+                       testDriver(testTask, MockFailingMapStub.class);
+                       Assert.fail("Function exception was not forwarded.");
+               } catch (ExpectedTestException e) {
+                       // good!
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("Exception in test.");
+               }
+       }
+       
+       @Test
+       public void testCancelMapTask() {
+               addInput(new InfiniteInputIterator());
+               setOutput(new DiscardingOutputCollector<Record>());
+               
+               final FlatMapDriver<Record, Record> testTask = new 
FlatMapDriver<>();
+               
+               final AtomicBoolean success = new AtomicBoolean(false);
+               
+               final Thread taskRunner = new Thread() {
+                       @Override
+                       public void run() {
+                               try {
+                                       testDriver(testTask, MockMapStub.class);
+                                       success.set(true);
+                               } catch (Exception ie) {
+                                       ie.printStackTrace();
+                               }
+                       }
+               };
+               taskRunner.start();
+               
+               TaskCancelThread tct = new TaskCancelThread(1, taskRunner, 
this);
+               tct.start();
+               
+               try {
+                       tct.join();
+                       taskRunner.join();              
+               } catch(InterruptedException ie) {
+                       Assert.fail("Joining threads failed");
+               }
+               
+               Assert.assertTrue("Test threw an exception even though it was 
properly canceled.", success.get());
+       }
+       
+       public static class MockMapStub extends RichFlatMapFunction<Record, 
Record> {
+               private static final long serialVersionUID = 1L;
+               
+               @Override
+               public void flatMap(Record record, Collector<Record> out) 
throws Exception {
+                       out.collect(record);
+               }
+               
+       }
+       
+       public static class MockFailingMapStub extends 
RichFlatMapFunction<Record, Record> {
+               private static final long serialVersionUID = 1L;
+               
+               private int cnt = 0;
+               
+               @Override
+               public void flatMap(Record record, Collector<Record> out) 
throws Exception {
+                       if (++this.cnt >= 10) {
+                               throw new ExpectedTestException();
+                       }
+                       out.collect(record);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
new file mode 100644
index 0000000..5b2e6eb
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.junit.Assert;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import 
org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.operators.testutils.DriverTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class JoinTaskExternalITCase extends 
DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
+       
+       private static final long HASH_MEM = 4*1024*1024;
+       
+       private static final long SORT_MEM = 3*1024*1024;
+       
+       private static final long BNLJN_MEM = 10 * PAGE_SIZE;
+
+       private final double bnljn_frac;
+
+       private final double hash_frac;
+
+       @SuppressWarnings("unchecked")
+       private final RecordComparator comparator1 = new RecordComparator(
+               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+       
+       @SuppressWarnings("unchecked")
+       private final RecordComparator comparator2 = new RecordComparator(
+               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+       
+       private final CountingOutputCollector output = new 
CountingOutputCollector();
+       
+       public JoinTaskExternalITCase(ExecutionConfig config) {
+               super(config, HASH_MEM, 2, SORT_MEM);
+               bnljn_frac = 
(double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
+               hash_frac = 
(double)HASH_MEM/this.getMemoryManager().getMemorySize();
+       }
+       
+       @Test
+       public void testExternalSort1MatchTask() {
+               final int keyCnt1 = 16384*4;
+               final int valCnt1 = 2;
+               
+               final int keyCnt2 = 8192;
+               final int valCnt2 = 4*2;
+               
+               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               
+               setOutput(this.output);
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+               setNumFileHandlesForSort(4);
+               
+               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       addInputSorted(new UniformRecordGenerator(keyCnt1, 
valCnt1, false), this.comparator1.duplicate());
+                       addInputSorted(new UniformRecordGenerator(keyCnt2, 
valCnt2, false), this.comparator2.duplicate());
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("The test caused an exception.");
+               }
+               
+               Assert.assertEquals("Wrong result set size.", expCnt, 
this.output.getNumberOfRecords());
+       }
+       
+       @Test
+       public void testExternalHash1MatchTask() {
+               final int keyCnt1 = 32768;
+               final int valCnt1 = 8;
+               
+               final int keyCnt2 = 65536;
+               final int valCnt2 = 8;
+               
+               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               
+               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               setOutput(this.output);
+               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+               getTaskConfig().setRelativeMemoryDriver(hash_frac);
+               
+               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("Test caused an exception.");
+               }
+               
+               Assert.assertEquals("Wrong result set size.", expCnt, 
this.output.getNumberOfRecords());
+       }
+       
+       @Test
+       public void testExternalHash2MatchTask() {
+               final int keyCnt1 = 32768;
+               final int valCnt1 = 8;
+               
+               final int keyCnt2 = 65536;
+               final int valCnt2 = 8;
+               
+               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               
+               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               setOutput(this.output);
+               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+               getTaskConfig().setRelativeMemoryDriver(hash_frac);
+               
+               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("Test caused an exception.");
+               }
+               
+               Assert.assertEquals("Wrong result set size.", expCnt, 
this.output.getNumberOfRecords());
+       }
+       
+       public static final class MockMatchStub implements 
FlatJoinFunction<Record, Record, Record> {
+               private static final long serialVersionUID = 1L;
+               
+               @Override
+               public void join(Record value1, Record value2, 
Collector<Record> out) throws Exception {
+                       out.collect(value1);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
new file mode 100644
index 0000000..ecde59e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
@@ -0,0 +1,1017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import 
org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import 
org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
+import org.apache.flink.runtime.operators.testutils.DriverTestBase;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
+import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class JoinTaskTest extends DriverTestBase<FlatJoinFunction<Record, 
Record, Record>> {
+       
+       private static final long HASH_MEM = 6*1024*1024;
+       
+       private static final long SORT_MEM = 3*1024*1024;
+
+       private static final int NUM_SORTER = 2;
+       
+       private static final long BNLJN_MEM = 10 * PAGE_SIZE;
+
+       private final double bnljn_frac;
+
+       private final double hash_frac;
+       
+       @SuppressWarnings("unchecked")
+       private final RecordComparator comparator1 = new RecordComparator(
+               new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ 
IntValue.class });
+       
+       @SuppressWarnings("unchecked")
+       private final RecordComparator comparator2 = new RecordComparator(
+               new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ 
IntValue.class });
+       
+       private final List<Record> outList = new ArrayList<>();
+       
+       
+       public JoinTaskTest(ExecutionConfig config) {
+               super(config, HASH_MEM, NUM_SORTER, SORT_MEM);
+               bnljn_frac = 
(double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
+               hash_frac = 
(double)HASH_MEM/this.getMemoryManager().getMemorySize();
+       }
+       
+       
+       @Test
+       public void testSortBoth1MatchTask() {
+               final int keyCnt1 = 20;
+               final int valCnt1 = 1;
+               
+               final int keyCnt2 = 10;
+               final int valCnt2 = 2;
+               
+               setOutput(this.outList);
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+               setNumFileHandlesForSort(4);
+               
+               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       addInputSorted(new UniformRecordGenerator(keyCnt1, 
valCnt1, false), this.comparator1.duplicate());
+                       addInputSorted(new UniformRecordGenerator(keyCnt2, 
valCnt2, false), this.comparator2.duplicate());
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("The test caused an exception.");
+               }
+               
+               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               Assert.assertTrue("Resultset size was " + this.outList.size() + 
". Expected was " + expCnt, this.outList.size() == expCnt);
+               
+               this.outList.clear();
+       }
+       
+       @Test
+       public void testSortBoth2MatchTask() {
+
+               int keyCnt1 = 20;
+               int valCnt1 = 1;
+               
+               int keyCnt2 = 20;
+               int valCnt2 = 1;
+               
+               setOutput(this.outList);
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+               setNumFileHandlesForSort(4);
+               
+               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       addInputSorted(new UniformRecordGenerator(keyCnt1, 
valCnt1, false), this.comparator1.duplicate());
+                       addInputSorted(new UniformRecordGenerator(keyCnt2, 
valCnt2, false), this.comparator2.duplicate());
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("The test caused an exception.");
+               }
+               
+               int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               
+               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+expCnt, this.outList.size() == expCnt);
+               
+               this.outList.clear();
+               
+       }
+       
+       @Test
+       public void testSortBoth3MatchTask() {
+
+               int keyCnt1 = 20;
+               int valCnt1 = 1;
+               
+               int keyCnt2 = 20;
+               int valCnt2 = 20;
+               
+               setOutput(this.outList);
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+               setNumFileHandlesForSort(4);
+               
+               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       addInputSorted(new UniformRecordGenerator(keyCnt1, 
valCnt1, false), this.comparator1.duplicate());
+                       addInputSorted(new UniformRecordGenerator(keyCnt2, 
valCnt2, false), this.comparator2.duplicate());
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("The test caused an exception.");
+               }
+               
+               int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               
+               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+expCnt, this.outList.size() == expCnt);
+               
+               this.outList.clear();
+               
+       }
+       
+       @Test
+       public void testSortBoth4MatchTask() {
+
+               int keyCnt1 = 20;
+               int valCnt1 = 20;
+               
+               int keyCnt2 = 20;
+               int valCnt2 = 1;
+               
+               setOutput(this.outList);
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+               setNumFileHandlesForSort(4);
+               
+               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       addInputSorted(new UniformRecordGenerator(keyCnt1, 
valCnt1, false), this.comparator1.duplicate());
+                       addInputSorted(new UniformRecordGenerator(keyCnt2, 
valCnt2, false), this.comparator2.duplicate());
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("The test caused an exception.");
+               }
+               
+               int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               
+               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+expCnt, this.outList.size() == expCnt);
+               
+               this.outList.clear();
+               
+       }
+       
+       @Test
+       public void testSortBoth5MatchTask() {
+
+               int keyCnt1 = 20;
+               int valCnt1 = 20;
+               
+               int keyCnt2 = 20;
+               int valCnt2 = 20;
+               
+               setOutput(this.outList);
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+               setNumFileHandlesForSort(4);
+               
+               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       addInputSorted(new UniformRecordGenerator(keyCnt1, 
valCnt1, false), this.comparator1.duplicate());
+                       addInputSorted(new UniformRecordGenerator(keyCnt2, 
valCnt2, false), this.comparator2.duplicate());
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("The test caused an exception.");
+               }
+               
+               int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               
+               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+expCnt, this.outList.size() == expCnt);
+               
+               this.outList.clear();
+               
+       }
+       
+       @Test
+       public void testSortFirstMatchTask() {
+
+               int keyCnt1 = 20;
+               int valCnt1 = 20;
+               
+               int keyCnt2 = 20;
+               int valCnt2 = 20;
+               
+               setOutput(this.outList);
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+               setNumFileHandlesForSort(4);
+               
+               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       addInputSorted(new UniformRecordGenerator(keyCnt1, 
valCnt1, false), this.comparator1.duplicate());
+                       addInput(new UniformRecordGenerator(keyCnt2, valCnt2, 
true));
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("The test caused an exception.");
+               }
+               
+               int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               
+               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+expCnt, this.outList.size() == expCnt);
+               
+               this.outList.clear();
+               
+       }
+       
+       @Test
+       public void testSortSecondMatchTask() {
+
+               int keyCnt1 = 20;
+               int valCnt1 = 20;
+               
+               int keyCnt2 = 20;
+               int valCnt2 = 20;
+               
+               setOutput(this.outList);
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+               setNumFileHandlesForSort(4);
+               
+               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       addInput(new UniformRecordGenerator(keyCnt1, valCnt1, 
true));
+                       addInputSorted(new UniformRecordGenerator(keyCnt2, 
valCnt2, false), this.comparator2.duplicate());
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("The test caused an exception.");
+               }
+               
+               int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               
+               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+expCnt, this.outList.size() == expCnt);
+               
+               this.outList.clear();
+               
+       }
+       
+       @Test
+       public void testMergeMatchTask() {
+               int keyCnt1 = 20;
+               int valCnt1 = 20;
+               
+               int keyCnt2 = 20;
+               int valCnt2 = 20;
+               
+               setOutput(this.outList);
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+               setNumFileHandlesForSort(4);
+               
+               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
+               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
+               
+               try {
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("The test caused an exception.");
+               }
+               
+               int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               
+               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+expCnt, this.outList.size() == expCnt);
+               
+               this.outList.clear();
+               
+       }
+       
+       @Test
+       public void testFailingMatchTask() {
+               int keyCnt1 = 20;
+               int valCnt1 = 20;
+               
+               int keyCnt2 = 20;
+               int valCnt2 = 20;
+               
+               setOutput(new NirvanaOutputList());
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+               setNumFileHandlesForSort(4);
+               
+               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
+               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
+               
+               try {
+                       testDriver(testTask, MockFailingMatchStub.class);
+                       Assert.fail("Driver did not forward Exception.");
+               } catch (ExpectedTestException e) {
+                       // good!
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("The test caused an exception.");
+               }
+       }
+       
+       @Test
+       public void testCancelMatchTaskWhileSort1() {
+               final int keyCnt = 20;
+               final int valCnt = 20;
+               
+               try {
+                       setOutput(new NirvanaOutputList());
+                       addDriverComparator(this.comparator1);
+                       addDriverComparator(this.comparator2);
+                       
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+                       
getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+                       getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+                       setNumFileHandlesForSort(4);
+                       
+                       final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+                       
+                       try {
+                               addInputSorted(new 
DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
+                               addInput(new UniformRecordGenerator(keyCnt, 
valCnt, true));
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                               Assert.fail("The test caused an exception.");
+                       }
+       
+                       final AtomicReference<Throwable> error = new 
AtomicReference<>();
+
+                       Thread taskRunner = new Thread("Task runner for 
testCancelMatchTaskWhileSort1()") {
+                               @Override
+                               public void run() {
+                                       try {
+                                               testDriver(testTask, 
MockMatchStub.class);
+                                       }
+                                       catch (Throwable t) {
+                                               error.set(t);
+                                       }
+                               }
+                       };
+                       taskRunner.start();
+
+                       Thread.sleep(1000);
+
+                       cancel();
+                       taskRunner.interrupt();
+
+                       taskRunner.join(60000);
+
+                       assertFalse("Task thread did not finish within 60 
seconds", taskRunner.isAlive());
+
+                       Throwable taskError = error.get();
+                       if (taskError != null) {
+                               taskError.printStackTrace();
+                               fail("Error in task while canceling: " + 
taskError.getMessage());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testCancelMatchTaskWhileSort2() {
+               final int keyCnt = 20;
+               final int valCnt = 20;
+               
+               try {
+                       setOutput(new NirvanaOutputList());
+                       addDriverComparator(this.comparator1);
+                       addDriverComparator(this.comparator2);
+                       
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+                       
getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+                       getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+                       setNumFileHandlesForSort(4);
+                       
+                       final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+                       
+                       try {
+                               addInput(new UniformRecordGenerator(keyCnt, 
valCnt, true));
+                               addInputSorted(new 
DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                               Assert.fail("The test caused an exception.");
+                       }
+
+                       final AtomicReference<Throwable> error = new 
AtomicReference<>();
+
+                       Thread taskRunner = new Thread("Task runner for 
testCancelMatchTaskWhileSort2()") {
+                               @Override
+                               public void run() {
+                                       try {
+                                               testDriver(testTask, 
MockMatchStub.class);
+                                       }
+                                       catch (Throwable t) {
+                                               error.set(t);
+                                       }
+                               }
+                       };
+                       taskRunner.start();
+
+                       Thread.sleep(1000);
+
+                       cancel();
+                       taskRunner.interrupt();
+
+                       taskRunner.join(60000);
+
+                       assertFalse("Task thread did not finish within 60 
seconds", taskRunner.isAlive());
+
+                       Throwable taskError = error.get();
+                       if (taskError != null) {
+                               taskError.printStackTrace();
+                               fail("Error in task while canceling: " + 
taskError.getMessage());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testCancelMatchTaskWhileMatching() {
+               final int keyCnt = 20;
+               final int valCnt = 20;
+               
+               try {
+                       setOutput(new NirvanaOutputList());
+                       addDriverComparator(this.comparator1);
+                       addDriverComparator(this.comparator2);
+                       
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+                       
getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+                       getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+                       setNumFileHandlesForSort(4);
+                       
+                       final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+                       
+                       addInput(new UniformRecordGenerator(keyCnt, valCnt, 
true));
+                       addInput(new UniformRecordGenerator(keyCnt, valCnt, 
true));
+                       
+                       final AtomicReference<Throwable> error = new 
AtomicReference<>();
+                       
+                       Thread taskRunner = new Thread("Task runner for 
testCancelMatchTaskWhileMatching()") {
+                               @Override
+                               public void run() {
+                                       try {
+                                               testDriver(testTask, 
MockDelayingMatchStub.class);
+                                       }
+                                       catch (Throwable t) {
+                                               error.set(t);
+                                       }
+                               }
+                       };
+                       taskRunner.start();
+                       
+                       Thread.sleep(1000);
+                       
+                       cancel();
+                       taskRunner.interrupt();
+                       
+                       taskRunner.join(60000);
+                       
+                       assertFalse("Task thread did not finish within 60 
seconds", taskRunner.isAlive());
+                       
+                       Throwable taskError = error.get();
+                       if (taskError != null) {
+                               taskError.printStackTrace();
+                               fail("Error in task while canceling: " + 
taskError.getMessage());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testHash1MatchTask() {
+               int keyCnt1 = 20;
+               int valCnt1 = 1;
+               
+               int keyCnt2 = 10;
+               int valCnt2 = 2;
+                               
+               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               setOutput(this.outList);
+               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+               getTaskConfig().setRelativeMemoryDriver(hash_frac);
+               
+               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("Test caused an exception.");
+               }
+               
+               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               Assert.assertEquals("Wrong result set size.", expCnt, 
this.outList.size());
+               this.outList.clear();
+       }
+       
+       @Test
+       public void testHash2MatchTask() {
+               int keyCnt1 = 20;
+               int valCnt1 = 1;
+               
+               int keyCnt2 = 20;
+               int valCnt2 = 1;
+               
+               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               setOutput(this.outList);
+               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+               getTaskConfig().setRelativeMemoryDriver(hash_frac);
+               
+               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("Test caused an exception.");
+               }
+               
+               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               Assert.assertEquals("Wrong result set size.", expCnt, 
this.outList.size());
+               this.outList.clear();
+       }
+       
+       @Test
+       public void testHash3MatchTask() {
+               int keyCnt1 = 20;
+               int valCnt1 = 1;
+               
+               int keyCnt2 = 20;
+               int valCnt2 = 20;
+               
+               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               setOutput(this.outList);
+               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+               getTaskConfig().setRelativeMemoryDriver(hash_frac);
+               
+               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("Test caused an exception.");
+               }
+               
+               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               Assert.assertEquals("Wrong result set size.", expCnt, 
this.outList.size());
+               this.outList.clear();
+       }
+       
+       @Test
+       public void testHash4MatchTask() {
+               int keyCnt1 = 20;
+               int valCnt1 = 20;
+               
+               int keyCnt2 = 20;
+               int valCnt2 = 1;
+               
+               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               setOutput(this.outList);
+               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+               getTaskConfig().setRelativeMemoryDriver(hash_frac);
+               
+               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("Test caused an exception.");
+               }
+               
+               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               Assert.assertEquals("Wrong result set size.", expCnt, 
this.outList.size());
+               this.outList.clear();
+       }
+       
+       @Test
+       public void testHash5MatchTask() {
+               int keyCnt1 = 20;
+               int valCnt1 = 20;
+               
+               int keyCnt2 = 20;
+               int valCnt2 = 20;
+               
+               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               setOutput(this.outList);
+               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+               getTaskConfig().setRelativeMemoryDriver(hash_frac);
+               
+               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       testDriver(testTask, MockMatchStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("Test caused an exception.");
+               }
+               
+               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+               Assert.assertEquals("Wrong result set size.", expCnt, 
this.outList.size());
+               this.outList.clear();
+       }
+       
+       @Test
+       public void testFailingHashFirstMatchTask() {
+               int keyCnt1 = 20;
+               int valCnt1 = 20;
+               
+               int keyCnt2 = 20;
+               int valCnt2 = 20;
+               
+               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               setOutput(new NirvanaOutputList());
+               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+               getTaskConfig().setRelativeMemoryDriver(hash_frac);
+               
+               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       testDriver(testTask, MockFailingMatchStub.class);
+                       Assert.fail("Function exception was not forwarded.");
+               } catch (ExpectedTestException etex) {
+                       // good!
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("Test caused an exception.");
+               }
+       }
+       
+       @Test
+       public void testFailingHashSecondMatchTask() {
+               int keyCnt1 = 20;
+               int valCnt1 = 20;
+               
+               int keyCnt2 = 20;
+               int valCnt2 = 20;
+               
+               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               setOutput(new NirvanaOutputList());
+               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+               getTaskConfig().setRelativeMemoryDriver(hash_frac);
+               
+               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               try {
+                       testDriver(testTask, MockFailingMatchStub.class);
+                       Assert.fail("Function exception was not forwarded.");
+               } catch (ExpectedTestException etex) {
+                       // good!
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("Test caused an exception.");
+               }
+       }
+       
+       @Test
+       public void testCancelHashMatchTaskWhileBuildFirst() {
+               final int keyCnt = 20;
+               final int valCnt = 20;
+
+               try {
+                       addInput(new DelayingInfinitiveInputIterator(100));
+                       addInput(new UniformRecordGenerator(keyCnt, valCnt, 
false));
+
+                       addDriverComparator(this.comparator1);
+                       addDriverComparator(this.comparator2);
+
+                       
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+
+                       setOutput(new NirvanaOutputList());
+
+                       
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+                       getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+                       final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+
+                       final AtomicBoolean success = new AtomicBoolean(false);
+
+                       Thread taskRunner = new Thread() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               testDriver(testTask, 
MockMatchStub.class);
+                                               success.set(true);
+                                       } catch (Exception ie) {
+                                               ie.printStackTrace();
+                                       }
+                               }
+                       };
+                       taskRunner.start();
+
+                       Thread.sleep(1000);
+                       cancel();
+
+                       try {
+                               taskRunner.join();
+                       }
+                       catch (InterruptedException ie) {
+                               Assert.fail("Joining threads failed");
+                       }
+
+                       Assert.assertTrue("Test threw an exception even though 
it was properly canceled.", success.get());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testHashCancelMatchTaskWhileBuildSecond() {
+               final int keyCnt = 20;
+               final int valCnt = 20;
+
+               try {
+                       addInput(new UniformRecordGenerator(keyCnt, valCnt, 
false));
+                       addInput(new DelayingInfinitiveInputIterator(100));
+
+                       addDriverComparator(this.comparator1);
+                       addDriverComparator(this.comparator2);
+
+                       
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+
+                       setOutput(new NirvanaOutputList());
+
+                       
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+                       getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+                       final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+
+                       final AtomicBoolean success = new AtomicBoolean(false);
+
+                       Thread taskRunner = new Thread() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               testDriver(testTask, 
MockMatchStub.class);
+                                               success.set(true);
+                                       } catch (Exception ie) {
+                                               ie.printStackTrace();
+                                       }
+                               }
+                       };
+                       taskRunner.start();
+
+                       Thread.sleep(1000);
+                       cancel();
+
+                       try {
+                               taskRunner.join();
+                       }
+                       catch (InterruptedException ie) {
+                               Assert.fail("Joining threads failed");
+                       }
+
+                       Assert.assertTrue("Test threw an exception even though 
it was properly canceled.", success.get());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testHashFirstCancelMatchTaskWhileMatching() {
+               int keyCnt = 20;
+               int valCnt = 20;
+               
+               addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+               addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               setOutput(new NirvanaOutputList());
+               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+               getTaskConfig().setRelativeMemoryDriver(hash_frac);
+               
+               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               final AtomicBoolean success = new AtomicBoolean(false);
+               
+               Thread taskRunner = new Thread() {
+                       @Override
+                       public void run() {
+                               try {
+                                       testDriver(testTask, 
MockMatchStub.class);
+                                       success.set(true);
+                               } catch (Exception ie) {
+                                       ie.printStackTrace();
+                               }
+                       }
+               };
+               taskRunner.start();
+               
+               TaskCancelThread tct = new TaskCancelThread(1, taskRunner, 
this);
+               tct.start();
+               
+               try {
+                       tct.join();
+                       taskRunner.join();
+               } catch(InterruptedException ie) {
+                       Assert.fail("Joining threads failed");
+               }
+               
+               Assert.assertTrue("Test threw an exception even though it was 
properly canceled.", success.get());
+       }
+       
+       @Test
+       public void testHashSecondCancelMatchTaskWhileMatching() {
+               int keyCnt = 20;
+               int valCnt = 20;
+               
+               addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+               addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+               addDriverComparator(this.comparator1);
+               addDriverComparator(this.comparator2);
+               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+               setOutput(new NirvanaOutputList());
+               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+               getTaskConfig().setRelativeMemoryDriver(hash_frac);
+               
+               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<>();
+               
+               final AtomicBoolean success = new AtomicBoolean(false);
+               
+               Thread taskRunner = new Thread() {
+                       @Override
+                       public void run() {
+                               try {
+                                       testDriver(testTask, 
MockMatchStub.class);
+                                       success.set(true);
+                               } catch (Exception ie) {
+                                       ie.printStackTrace();
+                               }
+                       }
+               };
+               taskRunner.start();
+               
+               TaskCancelThread tct = new TaskCancelThread(1, taskRunner, 
this);
+               tct.start();
+               
+               try {
+                       tct.join();
+                       taskRunner.join();
+               } catch(InterruptedException ie) {
+                       Assert.fail("Joining threads failed");
+               }
+               
+               Assert.assertTrue("Test threw an exception even though it was 
properly canceled.", success.get());
+       }
+       
+       // 
=================================================================================================
+       
+       public static final class MockMatchStub implements 
FlatJoinFunction<Record, Record, Record> {
+               private static final long serialVersionUID = 1L;
+               
+               @Override
+               public void join(Record record1, Record record2, 
Collector<Record> out) throws Exception {
+                       out.collect(record1);
+               }
+       }
+       
+       public static final class MockFailingMatchStub implements 
FlatJoinFunction<Record, Record, Record> {
+               private static final long serialVersionUID = 1L;
+               
+               private int cnt = 0;
+               
+               @Override
+               public void join(Record record1, Record record2, 
Collector<Record> out) throws Exception {
+                       if (++this.cnt >= 10) {
+                               throw new ExpectedTestException();
+                       }
+                       out.collect(record1);
+               }
+       }
+       
+       public static final class MockDelayingMatchStub implements 
FlatJoinFunction<Record, Record, Record> {
+               private static final long serialVersionUID = 1L;
+               
+               @Override
+               public void join(Record record1, Record record2, 
Collector<Record> out) throws Exception {
+                       try {
+                               Thread.sleep(100);
+                       } catch (InterruptedException e) {
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
deleted file mode 100644
index bfc6c44..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
-import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
-import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class MapTaskTest extends DriverTestBase<GenericCollectorMap<Record, 
Record>> {
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(MapTaskTest.class);
-       
-       private final CountingOutputCollector output = new 
CountingOutputCollector();
-       
-       
-       public MapTaskTest(ExecutionConfig config) {
-               super(config, 0, 0);
-       }
-       
-       @Test
-       public void testMapTask() {
-               final int keyCnt = 100;
-               final int valCnt = 20;
-               
-               addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-               setOutput(this.output);
-               
-               final CollectorMapDriver<Record, Record> testDriver = new 
CollectorMapDriver<Record, Record>();
-               
-               try {
-                       testDriver(testDriver, MockMapStub.class);
-               } catch (Exception e) {
-                       LOG.debug("Exception while running the test driver.", 
e);
-                       Assert.fail("Invoke method caused exception.");
-               }
-               
-               Assert.assertEquals("Wrong result set size.", keyCnt*valCnt, 
this.output.getNumberOfRecords());
-       }
-       
-       @Test
-       public void testFailingMapTask() {
-               final int keyCnt = 100;
-               final int valCnt = 20;
-               
-               addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-               setOutput(new DiscardingOutputCollector<Record>());
-               
-               final CollectorMapDriver<Record, Record> testTask = new 
CollectorMapDriver<Record, Record>();
-               try {
-                       testDriver(testTask, MockFailingMapStub.class);
-                       Assert.fail("Function exception was not forwarded.");
-               } catch (ExpectedTestException e) {
-                       // good!
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Exception in test.");
-               }
-       }
-       
-       @Test
-       public void testCancelMapTask() {
-               addInput(new InfiniteInputIterator());
-               setOutput(new DiscardingOutputCollector<Record>());
-               
-               final CollectorMapDriver<Record, Record> testTask = new 
CollectorMapDriver<Record, Record>();
-               
-               final AtomicBoolean success = new AtomicBoolean(false);
-               
-               final Thread taskRunner = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       testDriver(testTask, MockMapStub.class);
-                                       success.set(true);
-                               } catch (Exception ie) {
-                                       ie.printStackTrace();
-                               }
-                       }
-               };
-               taskRunner.start();
-               
-               TaskCancelThread tct = new TaskCancelThread(1, taskRunner, 
this);
-               tct.start();
-               
-               try {
-                       tct.join();
-                       taskRunner.join();              
-               } catch(InterruptedException ie) {
-                       Assert.fail("Joining threads failed");
-               }
-               
-               Assert.assertTrue("Test threw an exception even though it was 
properly canceled.", success.get());
-       }
-       
-       public static class MockMapStub extends MapFunction {
-               private static final long serialVersionUID = 1L;
-               
-               @Override
-               public void map(Record record, Collector<Record> out) throws 
Exception {
-                       out.collect(record);
-               }
-               
-       }
-       
-       public static class MockFailingMapStub extends MapFunction {
-               private static final long serialVersionUID = 1L;
-               
-               private int cnt = 0;
-               
-               @Override
-               public void map(Record record, Collector<Record> out) throws 
Exception {
-                       if (++this.cnt >= 10) {
-                               throw new ExpectedTestException();
-                       }
-                       out.collect(record);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
deleted file mode 100644
index 6f7fb21..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.junit.Assert;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import 
org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class MatchTaskExternalITCase extends 
DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
-       
-       private static final long HASH_MEM = 4*1024*1024;
-       
-       private static final long SORT_MEM = 3*1024*1024;
-       
-       private static final long BNLJN_MEM = 10 * PAGE_SIZE;
-
-       private final double bnljn_frac;
-
-       private final double hash_frac;
-
-       @SuppressWarnings("unchecked")
-       private final RecordComparator comparator1 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
-       
-       @SuppressWarnings("unchecked")
-       private final RecordComparator comparator2 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
-       
-       private final CountingOutputCollector output = new 
CountingOutputCollector();
-       
-       public MatchTaskExternalITCase(ExecutionConfig config) {
-               super(config, HASH_MEM, 2, SORT_MEM);
-               bnljn_frac = 
(double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
-               hash_frac = 
(double)HASH_MEM/this.getMemoryManager().getMemorySize();
-       }
-       
-       @Test
-       public void testExternalSort1MatchTask() {
-               final int keyCnt1 = 16384*4;
-               final int valCnt1 = 2;
-               
-               final int keyCnt2 = 8192;
-               final int valCnt2 = 4*2;
-               
-               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               
-               setOutput(this.output);
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-               setNumFileHandlesForSort(4);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       addInputSorted(new UniformRecordGenerator(keyCnt1, 
valCnt1, false), this.comparator1.duplicate());
-                       addInputSorted(new UniformRecordGenerator(keyCnt2, 
valCnt2, false), this.comparator2.duplicate());
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("The test caused an exception.");
-               }
-               
-               Assert.assertEquals("Wrong result set size.", expCnt, 
this.output.getNumberOfRecords());
-       }
-       
-       @Test
-       public void testExternalHash1MatchTask() {
-               final int keyCnt1 = 32768;
-               final int valCnt1 = 8;
-               
-               final int keyCnt2 = 65536;
-               final int valCnt2 = 8;
-               
-               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               
-               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               setOutput(this.output);
-               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-               getTaskConfig().setRelativeMemoryDriver(hash_frac);
-               
-               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Test caused an exception.");
-               }
-               
-               Assert.assertEquals("Wrong result set size.", expCnt, 
this.output.getNumberOfRecords());
-       }
-       
-       @Test
-       public void testExternalHash2MatchTask() {
-               final int keyCnt1 = 32768;
-               final int valCnt1 = 8;
-               
-               final int keyCnt2 = 65536;
-               final int valCnt2 = 8;
-               
-               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               
-               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               setOutput(this.output);
-               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-               getTaskConfig().setRelativeMemoryDriver(hash_frac);
-               
-               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Test caused an exception.");
-               }
-               
-               Assert.assertEquals("Wrong result set size.", expCnt, 
this.output.getNumberOfRecords());
-       }
-       
-       public static final class MockMatchStub extends JoinFunction {
-               private static final long serialVersionUID = 1L;
-               
-               @Override
-               public void join(Record value1, Record value2, 
Collector<Record> out) throws Exception {
-                       out.collect(value1);
-               }
-       }
-}

Reply via email to