[FLINK-2105] [tests] Move duplicate utility classes to testutil package

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df9f4819
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df9f4819
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df9f4819

Branch: refs/heads/master
Commit: df9f4819b9368600c7531dbf4d4ec42c1cddea8f
Parents: db0b008
Author: r-pogalz <r.pog...@campus.tu-berlin.de>
Authored: Mon Aug 3 12:59:01 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Aug 4 21:35:27 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/operators/MatchDriver.java    |   8 +-
 .../sort/AbstractMergeInnerJoinIterator.java    | 108 ++++++
 .../sort/AbstractMergeMatchIterator.java        | 107 ------
 .../sort/NonReusingMergeInnerJoinIterator.java  |  59 +++
 .../sort/NonReusingMergeMatchIterator.java      |  59 ---
 .../sort/ReusingMergeInnerJoinIterator.java     |  64 ++++
 .../sort/ReusingMergeMatchIterator.java         |  64 ----
 ...ReusingSortMergeInnerJoinIteratorITCase.java | 318 ++++++++++++++++
 .../NonReusingSortMergeMatchIteratorITCase.java | 371 -------------------
 ...ReusingSortMergeInnerJoinIteratorITCase.java | 318 ++++++++++++++++
 .../ReusingSortMergeMatchIteratorITCase.java    | 371 -------------------
 .../operators/testutils/CollectionIterator.java |  61 +++
 .../runtime/operators/testutils/Match.java      |  63 ++++
 .../testutils/MatchRemovingMatcher.java         |  58 +++
 .../testutils/SimpleTupleJoinFunction.java      |  41 ++
 .../operators/util/HashVsSortMiniBenchmark.java |   6 +-
 16 files changed, 1097 insertions(+), 979 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
index 0381aab..e54fca5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators;
 import org.apache.flink.api.common.ExecutionConfig;
 import 
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
 import 
org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.sort.NonReusingMergeMatchIterator;
+import 
org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import 
org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
 import 
org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.sort.ReusingMergeMatchIterator;
+import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
@@ -126,7 +126,7 @@ public class MatchDriver<IT1, IT2, OT> implements 
PactDriver<FlatJoinFunction<IT
                if (this.objectReuseEnabled) {
                        switch (ls) {
                                case MERGE:
-                                       this.matchIterator = new 
ReusingMergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, 
serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, 
comparator2), memoryManager, ioManager, numPages, 
this.taskContext.getOwningNepheleTask());
+                                       this.matchIterator = new 
ReusingMergeInnerJoinIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, 
serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, 
comparator2), memoryManager, ioManager, numPages, 
this.taskContext.getOwningNepheleTask());
 
                                        break;
                                case HYBRIDHASH_BUILD_FIRST:
@@ -141,7 +141,7 @@ public class MatchDriver<IT1, IT2, OT> implements 
PactDriver<FlatJoinFunction<IT
                } else {
                        switch (ls) {
                                case MERGE:
-                                       this.matchIterator = new 
NonReusingMergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, 
serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, 
comparator2), memoryManager, ioManager, numPages, 
this.taskContext.getOwningNepheleTask());
+                                       this.matchIterator = new 
NonReusingMergeInnerJoinIterator<IT1, IT2, OT>(in1, in2, serializer1, 
comparator1, serializer2, comparator2, 
pairComparatorFactory.createComparator12(comparator1, comparator2), 
memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
 
                                        break;
                                case HYBRIDHASH_BUILD_FIRST:

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeInnerJoinIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeInnerJoinIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeInnerJoinIterator.java
new file mode 100644
index 0000000..e9ccf52
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeInnerJoinIterator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * joining through a sort-merge join strategy.
+ */
+public abstract class AbstractMergeInnerJoinIterator<T1, T2, O> extends 
AbstractMergeIterator<T1, T2, O> {
+
+       public AbstractMergeInnerJoinIterator(
+                       MutableObjectIterator<T1> input1, 
MutableObjectIterator<T2> input2,
+                       TypeSerializer<T1> serializer1, TypeComparator<T1> 
comparator1,
+                       TypeSerializer<T2> serializer2, TypeComparator<T2> 
comparator2,
+                       TypePairComparator<T1, T2> pairComparator,
+                       MemoryManager memoryManager,
+                       IOManager ioManager,
+                       int numMemoryPages,
+                       AbstractInvokable parentTask)
+                       throws MemoryAllocationException {
+               super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+       }
+
+       /**
+        * Calls the <code>JoinFunction#join()</code> method for all two 
key-value pairs that share the same key and come
+        * from different inputs. The output of the <code>join()</code> method 
is forwarded.
+        * <p/>
+        * This method first zig-zags between the two sorted inputs in order to 
find a common
+        * key, and then calls the join stub with the cross product of the 
values.
+        *
+        * @throws Exception Forwards all exceptions from the user code and the 
I/O system.
+        * @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
+        */
+       @Override
+       public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> 
joinFunction, final Collector<O> collector)
+                       throws Exception {
+               if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
+                       // consume all remaining keys (hack to prevent 
remaining inputs during iterations, lets get rid of this soon)
+                       while (this.iterator1.nextKey()) ;
+                       while (this.iterator2.nextKey()) ;
+
+                       return false;
+               }
+
+               final TypePairComparator<T1, T2> comparator = 
this.pairComparator;
+               comparator.setReference(this.iterator1.getCurrent());
+               T2 current2 = this.iterator2.getCurrent();
+
+               // zig zag
+               while (true) {
+                       // determine the relation between the (possibly 
composite) keys
+                       final int comp = 
comparator.compareToReference(current2);
+
+                       if (comp == 0) {
+                               break;
+                       }
+
+                       if (comp < 0) {
+                               if (!this.iterator2.nextKey()) {
+                                       return false;
+                               }
+                               current2 = this.iterator2.getCurrent();
+                       } else {
+                               if (!this.iterator1.nextKey()) {
+                                       return false;
+                               }
+                               
comparator.setReference(this.iterator1.getCurrent());
+                       }
+               }
+
+               // here, we have a common key! call the join function with the 
cross product of the
+               // values
+               final Iterator<T1> values1 = this.iterator1.getValues();
+               final Iterator<T2> values2 = this.iterator2.getValues();
+
+               crossMatchingGroup(values1, values2, joinFunction, collector);
+               return true;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java
deleted file mode 100644
index 791494d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.sort;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-import java.util.Iterator;
-
-/**
- * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
- * matching through a sort-merge join strategy.
- */
-public abstract class AbstractMergeMatchIterator<T1, T2, O> extends 
AbstractMergeIterator<T1, T2, O> {
-
-       public AbstractMergeMatchIterator(MutableObjectIterator<T1> input1, 
MutableObjectIterator<T2> input2,
-                                                                       
TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
-                                                                       
TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
-                                                                       
TypePairComparator<T1, T2> pairComparator,
-                                                                       
MemoryManager memoryManager,
-                                                                       
IOManager ioManager,
-                                                                       int 
numMemoryPages,
-                                                                       
AbstractInvokable parentTask)
-                       throws MemoryAllocationException {
-               super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
-       }
-
-       /**
-        * Calls the <code>JoinFunction#match()</code> method for all two 
key-value pairs that share the same key and come
-        * from different inputs. The output of the <code>match()</code> method 
is forwarded.
-        * <p>
-        * This method first zig-zags between the two sorted inputs in order to 
find a common
-        * key, and then calls the match stub with the cross product of the 
values.
-        *
-        * @throws Exception Forwards all exceptions from the user code and the 
I/O system.
-        * @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
-        */
-       @Override
-       public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> 
matchFunction, final Collector<O> collector)
-                       throws Exception {
-               if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
-                       // consume all remaining keys (hack to prevent 
remaining inputs during iterations, lets get rid of this soon)
-                       while (this.iterator1.nextKey()) ;
-                       while (this.iterator2.nextKey()) ;
-
-                       return false;
-               }
-
-               final TypePairComparator<T1, T2> comparator = 
this.pairComparator;
-               comparator.setReference(this.iterator1.getCurrent());
-               T2 current2 = this.iterator2.getCurrent();
-
-               // zig zag
-               while (true) {
-                       // determine the relation between the (possibly 
composite) keys
-                       final int comp = 
comparator.compareToReference(current2);
-
-                       if (comp == 0) {
-                               break;
-                       }
-
-                       if (comp < 0) {
-                               if (!this.iterator2.nextKey()) {
-                                       return false;
-                               }
-                               current2 = this.iterator2.getCurrent();
-                       } else {
-                               if (!this.iterator1.nextKey()) {
-                                       return false;
-                               }
-                               
comparator.setReference(this.iterator1.getCurrent());
-                       }
-               }
-
-               // here, we have a common key! call the match function with the 
cross product of the
-               // values
-               final Iterator<T1> values1 = this.iterator1.getValues();
-               final Iterator<T2> values2 = this.iterator2.getValues();
-
-               crossMatchingGroup(values1, values2, matchFunction, collector);
-               return true;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeInnerJoinIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeInnerJoinIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeInnerJoinIterator.java
new file mode 100644
index 0000000..644084c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeInnerJoinIterator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
+import org.apache.flink.util.MutableObjectIterator;
+
+
+public class NonReusingMergeInnerJoinIterator<T1, T2, O> extends 
AbstractMergeInnerJoinIterator<T1, T2, O> {
+
+       public NonReusingMergeInnerJoinIterator(
+                       MutableObjectIterator<T1> input1,
+                       MutableObjectIterator<T2> input2,
+                       TypeSerializer<T1> serializer1, TypeComparator<T1> 
comparator1,
+                       TypeSerializer<T2> serializer2, TypeComparator<T2> 
comparator2,
+                       TypePairComparator<T1, T2> pairComparator,
+                       MemoryManager memoryManager,
+                       IOManager ioManager,
+                       int numMemoryPages,
+                       AbstractInvokable parentTask)
+                       throws MemoryAllocationException {
+               super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+       }
+
+       @Override
+       protected <T> KeyGroupedIterator<T> 
createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> 
serializer, TypeComparator<T> comparator) {
+               return new NonReusingKeyGroupedIterator<T>(input, comparator);
+       }
+
+       @Override
+       protected <T> T createCopy(TypeSerializer<T> serializer, T value, T 
reuse) {
+               return serializer.copy(value);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
deleted file mode 100644
index 9705778..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.sort;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.util.KeyGroupedIterator;
-import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-public class NonReusingMergeMatchIterator<T1, T2, O> extends 
AbstractMergeMatchIterator<T1, T2, O> {
-
-       public NonReusingMergeMatchIterator(
-                       MutableObjectIterator<T1> input1,
-                       MutableObjectIterator<T2> input2,
-                       TypeSerializer<T1> serializer1, TypeComparator<T1> 
comparator1,
-                       TypeSerializer<T2> serializer2, TypeComparator<T2> 
comparator2,
-                       TypePairComparator<T1, T2> pairComparator,
-                       MemoryManager memoryManager,
-                       IOManager ioManager,
-                       int numMemoryPages,
-                       AbstractInvokable parentTask)
-                       throws MemoryAllocationException {
-               super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
-       }
-
-       @Override
-       protected <T> KeyGroupedIterator<T> 
createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> 
serializer, TypeComparator<T> comparator) {
-               return new NonReusingKeyGroupedIterator<T>(input, comparator);
-       }
-
-       @Override
-       protected <T> T createCopy(TypeSerializer<T> serializer, T value, T 
reuse) {
-               return serializer.copy(value);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeInnerJoinIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeInnerJoinIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeInnerJoinIterator.java
new file mode 100644
index 0000000..3a1a17a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeInnerJoinIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
+import org.apache.flink.util.MutableObjectIterator;
+
+
+public class ReusingMergeInnerJoinIterator<T1, T2, O> extends 
AbstractMergeInnerJoinIterator<T1, T2, O> {
+
+       public ReusingMergeInnerJoinIterator(
+                       MutableObjectIterator<T1> input1,
+                       MutableObjectIterator<T2> input2,
+                       TypeSerializer<T1> serializer1, TypeComparator<T1> 
comparator1,
+                       TypeSerializer<T2> serializer2, TypeComparator<T2> 
comparator2,
+                       TypePairComparator<T1, T2> pairComparator,
+                       MemoryManager memoryManager,
+                       IOManager ioManager,
+                       int numMemoryPages,
+                       AbstractInvokable parentTask)
+                       throws MemoryAllocationException {
+               super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+
+               this.copy1 = serializer1.createInstance();
+               this.spillHeadCopy = serializer1.createInstance();
+               this.copy2 = serializer2.createInstance();
+               this.blockHeadCopy = serializer2.createInstance();
+       }
+
+       @Override
+       protected <T> KeyGroupedIterator<T> 
createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> 
serializer, TypeComparator<T> comparator) {
+               return new ReusingKeyGroupedIterator<T>(input, serializer, 
comparator);
+       }
+
+       @Override
+       protected <T> T createCopy(TypeSerializer<T> serializer, T value, T 
reuse) {
+               return serializer.copy(value, reuse);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
deleted file mode 100644
index c9cf5a2..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.sort;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.util.KeyGroupedIterator;
-import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-public class ReusingMergeMatchIterator<T1, T2, O> extends 
AbstractMergeMatchIterator<T1, T2, O> {
-
-       public ReusingMergeMatchIterator(
-                       MutableObjectIterator<T1> input1,
-                       MutableObjectIterator<T2> input2,
-                       TypeSerializer<T1> serializer1, TypeComparator<T1> 
comparator1,
-                       TypeSerializer<T2> serializer2, TypeComparator<T2> 
comparator2,
-                       TypePairComparator<T1, T2> pairComparator,
-                       MemoryManager memoryManager,
-                       IOManager ioManager,
-                       int numMemoryPages,
-                       AbstractInvokable parentTask)
-                       throws MemoryAllocationException {
-               super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
-
-               this.copy1 = serializer1.createInstance();
-               this.spillHeadCopy = serializer1.createInstance();
-               this.copy2 = serializer2.createInstance();
-               this.blockHeadCopy = serializer2.createInstance();
-       }
-
-       @Override
-       protected <T> KeyGroupedIterator<T> 
createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> 
serializer, TypeComparator<T> comparator) {
-               return new ReusingKeyGroupedIterator<T>(input, serializer, 
comparator);
-       }
-
-       @Override
-       protected <T> T createCopy(TypeSerializer<T> serializer, T value, T 
reuse) {
-               return serializer.copy(value, reuse);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
new file mode 100644
index 0000000..7fc3734
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.*;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+@SuppressWarnings("deprecation")
+public class NonReusingSortMergeInnerJoinIteratorITCase {
+       
+       // total memory
+       private static final int MEMORY_SIZE = 1024 * 1024 * 16;
+       private static final int PAGES_FOR_BNLJN = 2;
+
+       // the size of the left and right inputs
+       private static final int INPUT_1_SIZE = 20000;
+
+       private static final int INPUT_2_SIZE = 1000;
+
+       // random seeds for the left and right input data generators
+       private static final long SEED1 = 561349061987311L;
+
+       private static final long SEED2 = 231434613412342L;
+       
+       // dummy abstract task
+       private final AbstractInvokable parentTask = new DummyInvokable();
+
+       private IOManager ioManager;
+       private MemoryManager memoryManager;
+
+       private TypeSerializer<Tuple2<Integer, String>> serializer1;
+       private TypeSerializer<Tuple2<Integer, String>> serializer2;
+       private TypeComparator<Tuple2<Integer, String>> comparator1;
+       private TypeComparator<Tuple2<Integer, String>> comparator2;
+       private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, 
String>> pairComparator;
+       
+
+       @SuppressWarnings("unchecked")
+       @Before
+       public void beforeTest() {
+               serializer1 = new TupleSerializer<Tuple2<Integer, String>>(
+                               (Class<Tuple2<Integer, String>>) (Class<?>) 
Tuple2.class,
+                               new TypeSerializer<?>[] { 
IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+               serializer2 = new TupleSerializer<Tuple2<Integer, String>>(
+                               (Class<Tuple2<Integer, String>>) (Class<?>) 
Tuple2.class,
+                               new TypeSerializer<?>[] { 
IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+               comparator1 =  new TupleComparator<Tuple2<Integer, String>>(
+                               new int[]{0},
+                               new TypeComparator<?>[] { new 
IntComparator(true) },
+                               new TypeSerializer<?>[] { 
IntSerializer.INSTANCE });
+               comparator2 =  new TupleComparator<Tuple2<Integer, String>>(
+                               new int[]{0},
+                               new TypeComparator<?>[] { new 
IntComparator(true) },
+                               new TypeSerializer<?>[] { 
IntSerializer.INSTANCE });
+               pairComparator = new GenericPairComparator<Tuple2<Integer, 
String>, Tuple2<Integer, String>>(comparator1, comparator2);
+               
+               this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+               this.ioManager = new IOManagerAsync();
+       }
+
+       @After
+       public void afterTest() {
+               if (this.ioManager != null) {
+                       this.ioManager.shutdown();
+                       if (!this.ioManager.isProperlyShutDown()) {
+                               Assert.fail("I/O manager failed to properly 
shut down.");
+                       }
+                       this.ioManager = null;
+               }
+               
+               if (this.memoryManager != null) {
+                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
+                               this.memoryManager.verifyEmpty());
+                       this.memoryManager.shutdown();
+                       this.memoryManager = null;
+               }
+       }
+
+       @Test
+       public void testMerge() {
+               try {
+
+                       final TupleGenerator generator1 = new 
TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+                       final TupleGenerator generator2 = new 
TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+
+                       final TestData.TupleGeneratorIterator input1 = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+                       // collect expected data
+                       final Map<Integer, Collection<Match>> 
expectedMatchesMap = matchValues(
+                                       collectData(input1),
+                                       collectData(input2));
+
+                       final FlatJoinFunction<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction =
+                                       new 
MatchRemovingMatcher(expectedMatchesMap);
+
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<Tuple2<Integer, String>>();
+       
+                       // reset the generators
+                       generator1.reset();
+                       generator2.reset();
+                       input1.reset();
+                       input2.reset();
+       
+                       // compare with iterator values
+                       NonReusingMergeInnerJoinIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                               new 
NonReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, 
String>, Tuple2<Integer, String>>(
+                                       input1, input2, this.serializer1, 
this.comparator1, this.serializer2, this.comparator2,
+                                       this.pairComparator, 
this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
+       
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(joinFunction, 
collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<Integer, Collection<Match>> entry : 
expectedMatchesMap.entrySet()) {
+                               Assert.assertTrue("Collection for key " + 
entry.getKey() + " is not empty", entry.getValue().isEmpty());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testMergeWithHighNumberOfCommonKeys()
+       {
+               // the size of the left and right inputs
+               final int INPUT_1_SIZE = 200;
+               final int INPUT_2_SIZE = 100;
+               
+               final int INPUT_1_DUPLICATES = 10;
+               final int INPUT_2_DUPLICATES = 4000;
+               final int DUPLICATE_KEY = 13;
+               
+               try {
+                       final TupleGenerator generator1 = new 
TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+                       final TupleGenerator generator2 = new 
TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+
+                       final TestData.TupleGeneratorIterator gen1Iter = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.TupleGeneratorIterator gen2Iter = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+                       final TestData.TupleConstantValueIterator const1Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for 
Duplicate Keys", INPUT_1_DUPLICATES);
+                       final TestData.TupleConstantValueIterator const2Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for 
Duplicate Keys", INPUT_2_DUPLICATES);
+
+                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList1 = new ArrayList<MutableObjectIterator<Tuple2<Integer, 
String>>>();
+                       inList1.add(gen1Iter);
+                       inList1.add(const1Iter);
+
+                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList2 = new ArrayList<MutableObjectIterator<Tuple2<Integer, 
String>>>();
+                       inList2.add(gen2Iter);
+                       inList2.add(const2Iter);
+
+                       MutableObjectIterator<Tuple2<Integer, String>> input1 = 
new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
+                       MutableObjectIterator<Tuple2<Integer, String>> input2 = 
new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
+                       
+                       // collect expected data
+                       final Map<Integer, Collection<Match>> 
expectedMatchesMap = matchValues(
+                               collectData(input1),
+                               collectData(input2));
+                       
+                       // re-create the whole thing for actual processing
+                       
+                       // reset the generators and iterators
+                       generator1.reset();
+                       generator2.reset();
+                       const1Iter.reset();
+                       const2Iter.reset();
+                       gen1Iter.reset();
+                       gen2Iter.reset();
+                       
+                       inList1.clear();
+                       inList1.add(gen1Iter);
+                       inList1.add(const1Iter);
+                       
+                       inList2.clear();
+                       inList2.add(gen2Iter);
+                       inList2.add(const2Iter);
+       
+                       input1 = new MergeIterator<Tuple2<Integer, 
String>>(inList1, comparator1.duplicate());
+                       input2 = new MergeIterator<Tuple2<Integer, 
String>>(inList2, comparator2.duplicate());
+                       
+                       final FlatJoinFunction<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = new 
MatchRemovingMatcher(expectedMatchesMap);
+                       
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<Tuple2<Integer, String>>();
+       
+                       
+                       // we create this sort-merge iterator with little 
memory for the block-nested-loops fall-back to make sure it
+                       // needs to spill for the duplicate keys
+                       NonReusingMergeInnerJoinIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                               new 
NonReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, 
String>, Tuple2<Integer, String>>(
+                                       input1, input2, this.serializer1, 
this.comparator1, this.serializer2, this.comparator2,
+                                       this.pairComparator, 
this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
+       
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(joinFunction, 
collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<Integer, Collection<Match>> entry : 
expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+       
+       
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                                    Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       private Map<Integer, Collection<Match>> matchValues(
+                       Map<Integer, Collection<String>> leftMap,
+                       Map<Integer, Collection<String>> rightMap)
+       {
+               Map<Integer, Collection<Match>> map = new HashMap<Integer, 
Collection<Match>>();
+
+               for (Integer key : leftMap.keySet()) {
+                       Collection<String> leftValues = leftMap.get(key);
+                       Collection<String> rightValues = rightMap.get(key);
+
+                       if (rightValues == null) {
+                               continue;
+                       }
+
+                       if (!map.containsKey(key)) {
+                               map.put(key, new ArrayList<Match>());
+                       }
+
+                       Collection<Match> matchedValues = map.get(key);
+
+                       for (String leftValue : leftValues) {
+                               for (String rightValue : rightValues) {
+                                       matchedValues.add(new Match(leftValue, 
rightValue));
+                               }
+                       }
+               }
+
+               return map;
+       }
+
+
+       private Map<Integer, Collection<String>> 
collectData(MutableObjectIterator<Tuple2<Integer, String>> iter)
+                       throws Exception
+       {
+               Map<Integer, Collection<String>> map = new HashMap<Integer, 
Collection<String>>();
+               Tuple2<Integer, String> pair = new Tuple2<Integer, String>();
+
+               while ((pair = iter.next(pair)) != null) {
+                       final Integer key = pair.getField(0);
+
+                       if (!map.containsKey(key)) {
+                               map.put(key, new ArrayList<String>());
+                       }
+
+                       Collection<String> values = map.get(key);
+                       final String value = pair.getField(1);
+                       values.add(value);
+               }
+
+               return map;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java
deleted file mode 100644
index 757b2e7..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class NonReusingSortMergeMatchIteratorITCase {
-       
-       // total memory
-       private static final int MEMORY_SIZE = 1024 * 1024 * 16;
-       private static final int PAGES_FOR_BNLJN = 2;
-
-       // the size of the left and right inputs
-       private static final int INPUT_1_SIZE = 20000;
-
-       private static final int INPUT_2_SIZE = 1000;
-
-       // random seeds for the left and right input data generators
-       private static final long SEED1 = 561349061987311L;
-
-       private static final long SEED2 = 231434613412342L;
-       
-       // dummy abstract task
-       private final AbstractInvokable parentTask = new DummyInvokable();
-
-       private IOManager ioManager;
-       private MemoryManager memoryManager;
-       
-       private TypeSerializer<Record> serializer1;
-       private TypeSerializer<Record> serializer2;
-       private TypeComparator<Record> comparator1;
-       private TypeComparator<Record> comparator2;
-       private TypePairComparator<Record, Record> pairComparator;
-       
-
-       @SuppressWarnings("unchecked")
-       @Before
-       public void beforeTest() {
-               this.serializer1 = RecordSerializer.get();
-               this.serializer2 = RecordSerializer.get();
-               this.comparator1 = new RecordComparator(new int[] {0}, new 
Class[]{TestData.Key.class});
-               this.comparator2 = new RecordComparator(new int[] {0}, new 
Class[]{TestData.Key.class});
-               this.pairComparator = new RecordPairComparator(new int[] {0}, 
new int[] {0}, new Class[]{TestData.Key.class});
-               
-               this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
-               this.ioManager = new IOManagerAsync();
-       }
-
-       @After
-       public void afterTest() {
-               if (this.ioManager != null) {
-                       this.ioManager.shutdown();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
-                       this.ioManager = null;
-               }
-               
-               if (this.memoryManager != null) {
-                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
-                               this.memoryManager.verifyEmpty());
-                       this.memoryManager.shutdown();
-                       this.memoryManager = null;
-               }
-       }
-
-
-       
-       @Test
-       public void testMerge() {
-               try {
-                       
-                       final TestData.Generator generator1 = new 
Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-                       final TestData.Generator generator2 = new 
Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-
-                       final TestData.GeneratorIterator input1 = new 
TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.GeneratorIterator input2 = new 
TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       // collect expected data
-                       final Map<TestData.Key, Collection<Match>> 
expectedMatchesMap = matchValues(
-                               collectData(input1),
-                               collectData(input2));
-                       
-                       final JoinFunction matcher = new 
MatchRemovingMatcher(expectedMatchesMap);
-                       
-                       final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
-       
-                       // reset the generators
-                       generator1.reset();
-                       generator2.reset();
-                       input1.reset();
-                       input2.reset();
-       
-                       // compare with iterator values
-                       NonReusingMergeMatchIterator<Record, Record, Record> 
iterator =
-                               new NonReusingMergeMatchIterator<Record, 
Record, Record>(
-                                       input1, input2, this.serializer1, 
this.comparator1, this.serializer2, this.comparator2,
-                                       this.pairComparator, 
this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
-       
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<TestData.Key, Collection<Match>> entry : 
expectedMatchesMap.entrySet()) {
-                               Assert.assertTrue("Collection for key " + 
entry.getKey() + " is not empty", entry.getValue().isEmpty());
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testMergeWithHighNumberOfCommonKeys()
-       {
-               // the size of the left and right inputs
-               final int INPUT_1_SIZE = 200;
-               final int INPUT_2_SIZE = 100;
-               
-               final int INPUT_1_DUPLICATES = 10;
-               final int INPUT_2_DUPLICATES = 4000;
-               final int DUPLICATE_KEY = 13;
-               
-               try {
-                       final TestData.Generator generator1 = new 
Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-                       final TestData.Generator generator2 = new 
Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-                       
-                       final TestData.GeneratorIterator gen1Iter = new 
TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.GeneratorIterator gen2Iter = new 
TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       final TestData.ConstantValueIterator const1Iter = new 
TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", 
INPUT_1_DUPLICATES);
-                       final TestData.ConstantValueIterator const2Iter = new 
TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate 
Keys", INPUT_2_DUPLICATES);
-                       
-                       final List<MutableObjectIterator<Record>> inList1 = new 
ArrayList<MutableObjectIterator<Record>>();
-                       inList1.add(gen1Iter);
-                       inList1.add(const1Iter);
-                       
-                       final List<MutableObjectIterator<Record>> inList2 = new 
ArrayList<MutableObjectIterator<Record>>();
-                       inList2.add(gen2Iter);
-                       inList2.add(const2Iter);
-                       
-                       MutableObjectIterator<Record> input1 = new 
MergeIterator<Record>(inList1, comparator1.duplicate());
-                       MutableObjectIterator<Record> input2 = new 
MergeIterator<Record>(inList2, comparator2.duplicate());
-                       
-                       // collect expected data
-                       final Map<TestData.Key, Collection<Match>> 
expectedMatchesMap = matchValues(
-                               collectData(input1),
-                               collectData(input2));
-                       
-                       // re-create the whole thing for actual processing
-                       
-                       // reset the generators and iterators
-                       generator1.reset();
-                       generator2.reset();
-                       const1Iter.reset();
-                       const2Iter.reset();
-                       gen1Iter.reset();
-                       gen2Iter.reset();
-                       
-                       inList1.clear();
-                       inList1.add(gen1Iter);
-                       inList1.add(const1Iter);
-                       
-                       inList2.clear();
-                       inList2.add(gen2Iter);
-                       inList2.add(const2Iter);
-       
-                       input1 = new MergeIterator<Record>(inList1, 
comparator1.duplicate());
-                       input2 = new MergeIterator<Record>(inList2, 
comparator2.duplicate());
-                       
-                       final JoinFunction matcher = new 
MatchRemovingMatcher(expectedMatchesMap);
-                       
-                       final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
-       
-                       
-                       // we create this sort-merge iterator with little 
memory for the block-nested-loops fall-back to make sure it
-                       // needs to spill for the duplicate keys
-                       NonReusingMergeMatchIterator<Record, Record, Record> 
iterator =
-                               new NonReusingMergeMatchIterator<Record, 
Record, Record>(
-                                       input1, input2, this.serializer1, 
this.comparator1, this.serializer2, this.comparator2,
-                                       this.pairComparator, 
this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
-       
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<TestData.Key, Collection<Match>> entry : 
expectedMatchesMap.entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                                    Utilities
-       // 
--------------------------------------------------------------------------------------------
-
-       private Map<TestData.Key, Collection<Match>> matchValues(
-                       Map<TestData.Key, Collection<TestData.Value>> leftMap,
-                       Map<TestData.Key, Collection<TestData.Value>> rightMap)
-       {
-               Map<TestData.Key, Collection<Match>> map = new 
HashMap<TestData.Key, Collection<Match>>();
-
-               for (TestData.Key key : leftMap.keySet()) {
-                       Collection<TestData.Value> leftValues = 
leftMap.get(key);
-                       Collection<TestData.Value> rightValues = 
rightMap.get(key);
-
-                       if (rightValues == null) {
-                               continue;
-                       }
-
-                       if (!map.containsKey(key)) {
-                               map.put(key, new ArrayList<Match>());
-                       }
-
-                       Collection<Match> matchedValues = map.get(key);
-
-                       for (TestData.Value leftValue : leftValues) {
-                               for (TestData.Value rightValue : rightValues) {
-                                       matchedValues.add(new Match(leftValue, 
rightValue));
-                               }
-                       }
-               }
-
-               return map;
-       }
-
-       
-       private Map<TestData.Key, Collection<TestData.Value>> 
collectData(MutableObjectIterator<Record> iter)
-       throws Exception
-       {
-               Map<TestData.Key, Collection<TestData.Value>> map = new 
HashMap<TestData.Key, Collection<TestData.Value>>();
-               Record pair = new Record();
-               
-               while ((pair = iter.next(pair)) != null) {
-                       TestData.Key key = pair.getField(0, TestData.Key.class);
-                       
-                       if (!map.containsKey(key)) {
-                               map.put(new TestData.Key(key.getKey()), new 
ArrayList<TestData.Value>());
-                       }
-
-                       Collection<TestData.Value> values = map.get(key);
-                       values.add(new TestData.Value(pair.getField(1, 
TestData.Value.class).getValue()));
-               }
-
-               return map;
-       }
-
-       /**
-        * Private class used for storage of the expected matches in a hashmap.
-        */
-       private static class Match {
-               private final Value left;
-
-               private final Value right;
-
-               public Match(Value left, Value right) {
-                       this.left = left;
-                       this.right = right;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       Match o = (Match) obj;
-                       return this.left.equals(o.left) && 
this.right.equals(o.right);
-               }
-               
-               @Override
-               public int hashCode() {
-                       return this.left.hashCode() ^ this.right.hashCode();
-               }
-
-               @Override
-               public String toString() {
-                       return left + ", " + right;
-               }
-       }
-       
-       private static final class MatchRemovingMatcher extends JoinFunction {
-               private static final long serialVersionUID = 1L;
-               
-               private final Map<TestData.Key, Collection<Match>> toRemoveFrom;
-               
-               protected MatchRemovingMatcher(Map<TestData.Key, 
Collection<Match>> map) {
-                       this.toRemoveFrom = map;
-               }
-               
-               @Override
-               public void join(Record rec1, Record rec2, Collector<Record> 
out) throws Exception {
-                       TestData.Key key = rec1.getField(0, TestData.Key.class);
-                       TestData.Value value1 = rec1.getField(1, 
TestData.Value.class);
-                       TestData.Value value2 = rec2.getField(1, 
TestData.Value.class);
-                       
-                       Collection<Match> matches = this.toRemoveFrom.get(key);
-                       if (matches == null) {
-                               Assert.fail("Match " + key + " - " + value1 + 
":" + value2 + " is unexpected.");
-                       }
-                       
-                       boolean contained = matches.remove(new Match(value1, 
value2));
-                       if (!contained) {
-                               Assert.fail("Produced match was not contained: 
" + key + " - " + value1 + ":" + value2);
-                       }
-                       if (matches.isEmpty()) {
-                               this.toRemoveFrom.remove(key);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
new file mode 100644
index 0000000..e4eec86
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.*;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+@SuppressWarnings("deprecation")
+public class ReusingSortMergeInnerJoinIteratorITCase {
+
+       // total memory
+       private static final int MEMORY_SIZE = 1024 * 1024 * 16;
+       private static final int PAGES_FOR_BNLJN = 2;
+
+       // the size of the left and right inputs
+       private static final int INPUT_1_SIZE = 20000;
+
+       private static final int INPUT_2_SIZE = 1000;
+
+       // random seeds for the left and right input data generators
+       private static final long SEED1 = 561349061987311L;
+
+       private static final long SEED2 = 231434613412342L;
+
+       // dummy abstract task
+       private final AbstractInvokable parentTask = new DummyInvokable();
+
+       private IOManager ioManager;
+       private MemoryManager memoryManager;
+
+       private TypeSerializer<Tuple2<Integer, String>> serializer1;
+       private TypeSerializer<Tuple2<Integer, String>> serializer2;
+       private TypeComparator<Tuple2<Integer, String>> comparator1;
+       private TypeComparator<Tuple2<Integer, String>> comparator2;
+       private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, 
String>> pairComparator;
+
+
+       @SuppressWarnings("unchecked")
+       @Before
+       public void beforeTest() {
+               serializer1 = new TupleSerializer<Tuple2<Integer, String>>(
+                               (Class<Tuple2<Integer, String>>) (Class<?>) 
Tuple2.class,
+                               new TypeSerializer<?>[] { 
IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+               serializer2 = new TupleSerializer<Tuple2<Integer, String>>(
+                               (Class<Tuple2<Integer, String>>) (Class<?>) 
Tuple2.class,
+                               new TypeSerializer<?>[] { 
IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+               comparator1 =  new TupleComparator<Tuple2<Integer, String>>(
+                               new int[]{0},
+                               new TypeComparator<?>[] { new 
IntComparator(true) },
+                               new TypeSerializer<?>[] { 
IntSerializer.INSTANCE });
+               comparator2 =  new TupleComparator<Tuple2<Integer, String>>(
+                               new int[]{0},
+                               new TypeComparator<?>[] { new 
IntComparator(true) },
+                               new TypeSerializer<?>[] { 
IntSerializer.INSTANCE });
+               pairComparator = new GenericPairComparator<Tuple2<Integer, 
String>, Tuple2<Integer, String>>(comparator1, comparator2);
+
+               this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+               this.ioManager = new IOManagerAsync();
+       }
+
+       @After
+       public void afterTest() {
+               if (this.ioManager != null) {
+                       this.ioManager.shutdown();
+                       if (!this.ioManager.isProperlyShutDown()) {
+                               Assert.fail("I/O manager failed to properly 
shut down.");
+                       }
+                       this.ioManager = null;
+               }
+
+               if (this.memoryManager != null) {
+                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
+                               this.memoryManager.verifyEmpty());
+                       this.memoryManager.shutdown();
+                       this.memoryManager = null;
+               }
+       }
+
+       @Test
+       public void testMerge() {
+               try {
+
+                       final TupleGenerator generator1 = new 
TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+                       final TupleGenerator generator2 = new 
TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+
+                       final TestData.TupleGeneratorIterator input1 = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+                       // collect expected data
+                       final Map<Integer, Collection<Match>> 
expectedMatchesMap = matchValues(
+                               collectData(input1),
+                               collectData(input2));
+
+                       final FlatJoinFunction<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction =
+                                       new 
MatchRemovingMatcher(expectedMatchesMap);
+
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<Tuple2<Integer, String>>();
+
+                       // reset the generators
+                       generator1.reset();
+                       generator2.reset();
+                       input1.reset();
+                       input2.reset();
+
+                       // compare with iterator values
+                       ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                               new 
ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, 
Tuple2<Integer, String>>(
+                                       input1, input2, this.serializer1, 
this.comparator1, this.serializer2, this.comparator2,
+                                       this.pairComparator, 
this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
+
+                       iterator.open();
+
+                       while (iterator.callWithNextKey(joinFunction, 
collector));
+
+                       iterator.close();
+
+                       // assert that each expected match was seen
+                       for (Entry<Integer, Collection<Match>> entry : 
expectedMatchesMap.entrySet()) {
+                               Assert.assertTrue("Collection for key " + 
entry.getKey() + " is not empty", entry.getValue().isEmpty());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+
+       @Test
+       public void testMergeWithHighNumberOfCommonKeys()
+       {
+               // the size of the left and right inputs
+               final int INPUT_1_SIZE = 200;
+               final int INPUT_2_SIZE = 100;
+
+               final int INPUT_1_DUPLICATES = 10;
+               final int INPUT_2_DUPLICATES = 4000;
+               final int DUPLICATE_KEY = 13;
+
+               try {
+                       final TupleGenerator generator1 = new 
TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+                       final TupleGenerator generator2 = new 
TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+                       
+                       final TestData.TupleGeneratorIterator gen1Iter = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.TupleGeneratorIterator gen2Iter = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+                       
+                       final TestData.TupleConstantValueIterator const1Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for 
Duplicate Keys", INPUT_1_DUPLICATES);
+                       final TestData.TupleConstantValueIterator const2Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for 
Duplicate Keys", INPUT_2_DUPLICATES);
+                       
+                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList1 = new ArrayList<MutableObjectIterator<Tuple2<Integer, 
String>>>();
+                       inList1.add(gen1Iter);
+                       inList1.add(const1Iter);
+                       
+                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList2 = new ArrayList<MutableObjectIterator<Tuple2<Integer, 
String>>>();
+                       inList2.add(gen2Iter);
+                       inList2.add(const2Iter);
+                       
+                       MutableObjectIterator<Tuple2<Integer, String>> input1 = 
new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
+                       MutableObjectIterator<Tuple2<Integer, String>> input2 = 
new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
+                       
+                       // collect expected data
+                       final Map<Integer, Collection<Match>> 
expectedMatchesMap = matchValues(
+                               collectData(input1),
+                               collectData(input2));
+                       
+                       // re-create the whole thing for actual processing
+                       
+                       // reset the generators and iterators
+                       generator1.reset();
+                       generator2.reset();
+                       const1Iter.reset();
+                       const2Iter.reset();
+                       gen1Iter.reset();
+                       gen2Iter.reset();
+                       
+                       inList1.clear();
+                       inList1.add(gen1Iter);
+                       inList1.add(const1Iter);
+                       
+                       inList2.clear();
+                       inList2.add(gen2Iter);
+                       inList2.add(const2Iter);
+       
+                       input1 = new MergeIterator<Tuple2<Integer, 
String>>(inList1, comparator1.duplicate());
+                       input2 = new MergeIterator<Tuple2<Integer, 
String>>(inList2, comparator2.duplicate());
+                       
+                       final FlatJoinFunction<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new 
MatchRemovingMatcher(expectedMatchesMap);
+                       
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<Tuple2<Integer, String>>();
+       
+                       
+                       // we create this sort-merge iterator with little 
memory for the block-nested-loops fall-back to make sure it
+                       // needs to spill for the duplicate keys
+                       ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                               new 
ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, 
Tuple2<Integer, String>>(
+                                       input1, input2, this.serializer1, 
this.comparator1, this.serializer2, this.comparator2,
+                                       this.pairComparator, 
this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
+       
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(matcher, collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<Integer, Collection<Match>> entry : 
expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+       
+       
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                                    Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       private Map<Integer, Collection<Match>> matchValues(
+                       Map<Integer, Collection<String>> leftMap,
+                       Map<Integer, Collection<String>> rightMap)
+       {
+               Map<Integer, Collection<Match>> map = new HashMap<Integer, 
Collection<Match>>();
+
+               for (Integer key : leftMap.keySet()) {
+                       Collection<String> leftValues = leftMap.get(key);
+                       Collection<String> rightValues = rightMap.get(key);
+
+                       if (rightValues == null) {
+                               continue;
+                       }
+
+                       if (!map.containsKey(key)) {
+                               map.put(key, new ArrayList<Match>());
+                       }
+
+                       Collection<Match> matchedValues = map.get(key);
+
+                       for (String leftValue : leftValues) {
+                               for (String rightValue : rightValues) {
+                                       matchedValues.add(new Match(leftValue, 
rightValue));
+                               }
+                       }
+               }
+
+               return map;
+       }
+
+       
+       private Map<Integer, Collection<String>> 
collectData(MutableObjectIterator<Tuple2<Integer, String>> iter)
+       throws Exception
+       {
+               Map<Integer, Collection<String>> map = new HashMap<Integer, 
Collection<String>>();
+               Tuple2<Integer, String> pair = new Tuple2<Integer, String>();
+               
+               while ((pair = iter.next(pair)) != null) {
+                       final Integer key = pair.getField(0);
+                       
+                       if (!map.containsKey(key)) {
+                               map.put(key, new ArrayList<String>());
+                       }
+
+                       Collection<String> values = map.get(key);
+                       final String value = pair.getField(1);
+                       values.add(value);
+               }
+
+               return map;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java
deleted file mode 100644
index 474fa3c..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.sort;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-@SuppressWarnings("deprecation")
-public class ReusingSortMergeMatchIteratorITCase {
-
-       // total memory
-       private static final int MEMORY_SIZE = 1024 * 1024 * 16;
-       private static final int PAGES_FOR_BNLJN = 2;
-
-       // the size of the left and right inputs
-       private static final int INPUT_1_SIZE = 20000;
-
-       private static final int INPUT_2_SIZE = 1000;
-
-       // random seeds for the left and right input data generators
-       private static final long SEED1 = 561349061987311L;
-
-       private static final long SEED2 = 231434613412342L;
-
-       // dummy abstract task
-       private final AbstractInvokable parentTask = new DummyInvokable();
-
-       private IOManager ioManager;
-       private MemoryManager memoryManager;
-
-       private TypeSerializer<Record> serializer1;
-       private TypeSerializer<Record> serializer2;
-       private TypeComparator<Record> comparator1;
-       private TypeComparator<Record> comparator2;
-       private TypePairComparator<Record, Record> pairComparator;
-
-
-       @SuppressWarnings("unchecked")
-       @Before
-       public void beforeTest() {
-               this.serializer1 = RecordSerializer.get();
-               this.serializer2 = RecordSerializer.get();
-               this.comparator1 = new RecordComparator(new int[] {0}, new 
Class[]{TestData.Key.class});
-               this.comparator2 = new RecordComparator(new int[] {0}, new 
Class[]{TestData.Key.class});
-               this.pairComparator = new RecordPairComparator(new int[] {0}, 
new int[] {0}, new Class[]{TestData.Key.class});
-
-               this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
-               this.ioManager = new IOManagerAsync();
-       }
-
-       @After
-       public void afterTest() {
-               if (this.ioManager != null) {
-                       this.ioManager.shutdown();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
-                       this.ioManager = null;
-               }
-
-               if (this.memoryManager != null) {
-                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
-                               this.memoryManager.verifyEmpty());
-                       this.memoryManager.shutdown();
-                       this.memoryManager = null;
-               }
-       }
-
-
-
-       @Test
-       public void testMerge() {
-               try {
-
-                       final Generator generator1 = new Generator(SEED1, 500, 
4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-                       final Generator generator2 = new Generator(SEED2, 500, 
2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-
-                       final TestData.GeneratorIterator input1 = new 
TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.GeneratorIterator input2 = new 
TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
-
-                       // collect expected data
-                       final Map<TestData.Key, Collection<Match>> 
expectedMatchesMap = matchValues(
-                               collectData(input1),
-                               collectData(input2));
-
-                       final JoinFunction matcher = new 
MatchRemovingMatcher(expectedMatchesMap);
-
-                       final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
-
-                       // reset the generators
-                       generator1.reset();
-                       generator2.reset();
-                       input1.reset();
-                       input2.reset();
-
-                       // compare with iterator values
-                       ReusingMergeMatchIterator<Record, Record, Record> 
iterator =
-                               new ReusingMergeMatchIterator<Record, Record, 
Record>(
-                                       input1, input2, this.serializer1, 
this.comparator1, this.serializer2, this.comparator2,
-                                       this.pairComparator, 
this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
-
-                       iterator.open();
-
-                       while (iterator.callWithNextKey(matcher, collector));
-
-                       iterator.close();
-
-                       // assert that each expected match was seen
-                       for (Entry<TestData.Key, Collection<Match>> entry : 
expectedMatchesMap.entrySet()) {
-                               Assert.assertTrue("Collection for key " + 
entry.getKey() + " is not empty", entry.getValue().isEmpty());
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-
-       @Test
-       public void testMergeWithHighNumberOfCommonKeys()
-       {
-               // the size of the left and right inputs
-               final int INPUT_1_SIZE = 200;
-               final int INPUT_2_SIZE = 100;
-
-               final int INPUT_1_DUPLICATES = 10;
-               final int INPUT_2_DUPLICATES = 4000;
-               final int DUPLICATE_KEY = 13;
-
-               try {
-                       final Generator generator1 = new Generator(SEED1, 500, 
4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-                       final Generator generator2 = new Generator(SEED2, 500, 
2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-                       
-                       final TestData.GeneratorIterator gen1Iter = new 
TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.GeneratorIterator gen2Iter = new 
TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       final TestData.ConstantValueIterator const1Iter = new 
TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", 
INPUT_1_DUPLICATES);
-                       final TestData.ConstantValueIterator const2Iter = new 
TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate 
Keys", INPUT_2_DUPLICATES);
-                       
-                       final List<MutableObjectIterator<Record>> inList1 = new 
ArrayList<MutableObjectIterator<Record>>();
-                       inList1.add(gen1Iter);
-                       inList1.add(const1Iter);
-                       
-                       final List<MutableObjectIterator<Record>> inList2 = new 
ArrayList<MutableObjectIterator<Record>>();
-                       inList2.add(gen2Iter);
-                       inList2.add(const2Iter);
-                       
-                       MutableObjectIterator<Record> input1 = new 
MergeIterator<Record>(inList1, comparator1.duplicate());
-                       MutableObjectIterator<Record> input2 = new 
MergeIterator<Record>(inList2, comparator2.duplicate());
-                       
-                       // collect expected data
-                       final Map<TestData.Key, Collection<Match>> 
expectedMatchesMap = matchValues(
-                               collectData(input1),
-                               collectData(input2));
-                       
-                       // re-create the whole thing for actual processing
-                       
-                       // reset the generators and iterators
-                       generator1.reset();
-                       generator2.reset();
-                       const1Iter.reset();
-                       const2Iter.reset();
-                       gen1Iter.reset();
-                       gen2Iter.reset();
-                       
-                       inList1.clear();
-                       inList1.add(gen1Iter);
-                       inList1.add(const1Iter);
-                       
-                       inList2.clear();
-                       inList2.add(gen2Iter);
-                       inList2.add(const2Iter);
-       
-                       input1 = new MergeIterator<Record>(inList1, 
comparator1.duplicate());
-                       input2 = new MergeIterator<Record>(inList2, 
comparator2.duplicate());
-                       
-                       final JoinFunction matcher = new 
MatchRemovingMatcher(expectedMatchesMap);
-                       
-                       final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
-       
-                       
-                       // we create this sort-merge iterator with little 
memory for the block-nested-loops fall-back to make sure it
-                       // needs to spill for the duplicate keys
-                       ReusingMergeMatchIterator<Record, Record, Record> 
iterator =
-                               new ReusingMergeMatchIterator<Record, Record, 
Record>(
-                                       input1, input2, this.serializer1, 
this.comparator1, this.serializer2, this.comparator2,
-                                       this.pairComparator, 
this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
-       
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<TestData.Key, Collection<Match>> entry : 
expectedMatchesMap.entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                                    Utilities
-       // 
--------------------------------------------------------------------------------------------
-
-       private Map<TestData.Key, Collection<Match>> matchValues(
-                       Map<TestData.Key, Collection<TestData.Value>> leftMap,
-                       Map<TestData.Key, Collection<TestData.Value>> rightMap)
-       {
-               Map<TestData.Key, Collection<Match>> map = new 
HashMap<TestData.Key, Collection<Match>>();
-
-               for (TestData.Key key : leftMap.keySet()) {
-                       Collection<TestData.Value> leftValues = 
leftMap.get(key);
-                       Collection<TestData.Value> rightValues = 
rightMap.get(key);
-
-                       if (rightValues == null) {
-                               continue;
-                       }
-
-                       if (!map.containsKey(key)) {
-                               map.put(key, new ArrayList<Match>());
-                       }
-
-                       Collection<Match> matchedValues = map.get(key);
-
-                       for (TestData.Value leftValue : leftValues) {
-                               for (TestData.Value rightValue : rightValues) {
-                                       matchedValues.add(new Match(leftValue, 
rightValue));
-                               }
-                       }
-               }
-
-               return map;
-       }
-
-       
-       private Map<TestData.Key, Collection<TestData.Value>> 
collectData(MutableObjectIterator<Record> iter)
-       throws Exception
-       {
-               Map<TestData.Key, Collection<TestData.Value>> map = new 
HashMap<TestData.Key, Collection<TestData.Value>>();
-               Record pair = new Record();
-               
-               while ((pair = iter.next(pair)) != null) {
-                       TestData.Key key = pair.getField(0, TestData.Key.class);
-                       
-                       if (!map.containsKey(key)) {
-                               map.put(new TestData.Key(key.getKey()), new 
ArrayList<TestData.Value>());
-                       }
-
-                       Collection<TestData.Value> values = map.get(key);
-                       values.add(new TestData.Value(pair.getField(1, 
TestData.Value.class).getValue()));
-               }
-
-               return map;
-       }
-
-       /**
-        * Private class used for storage of the expected matches in a hashmap.
-        */
-       private static class Match {
-               private final Value left;
-
-               private final Value right;
-
-               public Match(Value left, Value right) {
-                       this.left = left;
-                       this.right = right;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       Match o = (Match) obj;
-                       return this.left.equals(o.left) && 
this.right.equals(o.right);
-               }
-               
-               @Override
-               public int hashCode() {
-                       return this.left.hashCode() ^ this.right.hashCode();
-               }
-
-               @Override
-               public String toString() {
-                       return left + ", " + right;
-               }
-       }
-       
-       private static final class MatchRemovingMatcher extends JoinFunction {
-               private static final long serialVersionUID = 1L;
-               
-               private final Map<TestData.Key, Collection<Match>> toRemoveFrom;
-               
-               protected MatchRemovingMatcher(Map<TestData.Key, 
Collection<Match>> map) {
-                       this.toRemoveFrom = map;
-               }
-               
-               @Override
-               public void join(Record rec1, Record rec2, Collector<Record> 
out) throws Exception {
-                       TestData.Key key = rec1.getField(0, TestData.Key.class);
-                       TestData.Value value1 = rec1.getField(1, 
TestData.Value.class);
-                       TestData.Value value2 = rec2.getField(1, 
TestData.Value.class);
-                       
-                       Collection<Match> matches = this.toRemoveFrom.get(key);
-                       if (matches == null) {
-                               Assert.fail("Match " + key + " - " + value1 + 
":" + value2 + " is unexpected.");
-                       }
-                       
-                       boolean contained = matches.remove(new Match(value1, 
value2));
-                       if (!contained) {
-                               Assert.fail("Produced match was not contained: 
" + key + " - " + value1 + ":" + value2);
-                       }
-                       if (matches.isEmpty()) {
-                               this.toRemoveFrom.remove(key);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/CollectionIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/CollectionIterator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/CollectionIterator.java
new file mode 100644
index 0000000..7fd1b6c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/CollectionIterator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.runtime.util.ResettableMutableObjectIterator;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+
+
+public class CollectionIterator<T> implements 
ResettableMutableObjectIterator<T> {
+
+       private final Collection<T> collection;
+       private Iterator<T> iterator;
+
+       public CollectionIterator(Collection<T> collection) {
+               this.collection = collection;
+               this.iterator = collection.iterator();
+       }
+
+       @Override
+       public T next(T reuse) throws IOException {
+               return next();
+       }
+
+       @Override
+       public T next() throws IOException {
+               if (!iterator.hasNext()) {
+                       return null;
+               } else {
+                       return iterator.next();
+               }
+       }
+
+       @Override
+       public void reset() throws IOException {
+               iterator = collection.iterator();
+       }
+
+       public static <T> CollectionIterator<T> of(T... values) {
+               return new CollectionIterator<T>(Arrays.asList(values));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
new file mode 100644
index 0000000..539d864
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+/**
+ * Utility class for keeping track of matches in join operator tests.
+ *
+ * @see org.apache.flink.runtime.operators.testutils.MatchRemovingMatcher
+ */
+public class Match {
+       private final String left;
+
+       private final String right;
+
+       public Match(String left, String right) {
+               this.left = left;
+               this.right = right;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               Match o = (Match) obj;
+               if (left == null && o.left == null && right.equals(o.right)) {
+                       return true;
+               } else if (right == null && o.right == null && 
left.equals(o.left)) {
+                       return true;
+               } else {
+                       return this.left.equals(o.left) && 
this.right.equals(o.right);
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               if (left == null) {
+                       return right.hashCode();
+               } else if (right == null) {
+                       return left.hashCode();
+               } else {
+                       return this.left.hashCode() ^ this.right.hashCode();
+               }
+       }
+
+       @Override
+       public String toString() {
+               return left + ", " + right;
+       }
+}

Reply via email to