http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
deleted file mode 100644
index 1795062..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
+++ /dev/null
@@ -1,766 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
-import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
-import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
-import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.runtime.operators.testutils.types.IntPair;
-import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
-import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-@SuppressWarnings({"serial", "deprecation"})
-public class NonReusingHashMatchIteratorITCase {
-       
-       private static final int MEMORY_SIZE = 16000000;                // 
total memory
-
-       private static final int INPUT_1_SIZE = 20000;
-       private static final int INPUT_2_SIZE = 1000;
-
-       private static final long SEED1 = 561349061987311L;
-       private static final long SEED2 = 231434613412342L;
-       
-       private final AbstractInvokable parentTask = new DummyInvokable();
-
-       private IOManager ioManager;
-       private MemoryManager memoryManager;
-       
-       private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
-       private TypeComparator<Tuple2<Integer, String>> record1Comparator;
-       private TypeComparator<Tuple2<Integer, String>> record2Comparator;
-       private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, 
String>> recordPairComparator;
-       
-       private TypeSerializer<IntPair> pairSerializer;
-       private TypeComparator<IntPair> pairComparator;
-       private TypePairComparator<IntPair, Tuple2<Integer, String>> 
pairRecordPairComparator;
-       private TypePairComparator<Tuple2<Integer, String>, IntPair> 
recordPairPairComparator;
-
-
-       @SuppressWarnings("unchecked")
-       @Before
-       public void beforeTest() {
-               this.recordSerializer = TestData.getIntStringTupleSerializer();
-               
-               this.record1Comparator = TestData.getIntStringTupleComparator();
-               this.record2Comparator = TestData.getIntStringTupleComparator();
-               
-               this.recordPairComparator = new 
GenericPairComparator(record1Comparator, record2Comparator);
-               
-               this.pairSerializer = new IntPairSerializer();
-               this.pairComparator = new TestData.IntPairComparator();
-               this.pairRecordPairComparator = new 
IntPairTuplePairComparator();
-               this.recordPairPairComparator = new 
TupleIntPairPairComparator();
-               
-               this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
-               this.ioManager = new IOManagerAsync();
-       }
-
-       @After
-       public void afterTest() {
-               if (this.ioManager != null) {
-                       this.ioManager.shutdown();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
-                       this.ioManager = null;
-               }
-               
-               if (this.memoryManager != null) {
-                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
-                               this.memoryManager.verifyEmpty());
-                       this.memoryManager.shutdown();
-                       this.memoryManager = null;
-               }
-       }
-
-
-       @Test
-       public void testBuildFirst() {
-               try {
-                       TupleGenerator generator1 = new TupleGenerator(SEED1, 
500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-                       TupleGenerator generator2 = new TupleGenerator(SEED2, 
500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-                       
-                       final TestData.TupleGeneratorIterator input1 = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       // collect expected data
-                       final Map<Integer, Collection<TupleMatch>> 
expectedMatchesMap = matchSecondTupleFields(
-                               collectTupleData(input1),
-                               collectTupleData(input2));
-                       
-                       final FlatJoinFunction matcher = new 
TupleMatchRemovingJoin(expectedMatchesMap);
-                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<Tuple2<Integer, String>>();
-       
-                       // reset the generators
-                       generator1.reset();
-                       generator2.reset();
-                       input1.reset();
-                       input2.reset();
-       
-                       // compare with iterator values
-                       NonReusingBuildFirstHashMatchIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-                                       new 
NonReusingBuildFirstHashMatchIterator<>(
-                                               input1, input2, 
this.recordSerializer, this.record1Comparator, 
-                                               this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
-                                               this.memoryManager, ioManager, 
this.parentTask, 1.0, true);
-                       
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<Integer, Collection<TupleMatch>> entry : 
expectedMatchesMap.entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBuildFirstWithHighNumberOfCommonKeys()
-       {
-               // the size of the left and right inputs
-               final int INPUT_1_SIZE = 200;
-               final int INPUT_2_SIZE = 100;
-               
-               final int INPUT_1_DUPLICATES = 10;
-               final int INPUT_2_DUPLICATES = 2000;
-               final int DUPLICATE_KEY = 13;
-               
-               try {
-                       TupleGenerator generator1 = new TupleGenerator(SEED1, 
500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-                       TupleGenerator generator2 = new TupleGenerator(SEED2, 
500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-                       
-                       final TestData.TupleGeneratorIterator gen1Iter = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.TupleGeneratorIterator gen2Iter = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       final TestData.TupleConstantValueIterator const1Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for 
Duplicate Keys", INPUT_1_DUPLICATES);
-                       final TestData.TupleConstantValueIterator const2Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for 
Duplicate Keys", INPUT_2_DUPLICATES);
-                       
-                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList1 = new ArrayList<>();
-                       inList1.add(gen1Iter);
-                       inList1.add(const1Iter);
-                       
-                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList2 = new ArrayList<>();
-                       inList2.add(gen2Iter);
-                       inList2.add(const2Iter);
-                       
-                       MutableObjectIterator<Tuple2<Integer, String>> input1 = 
new UnionIterator<>(inList1);
-                       MutableObjectIterator<Tuple2<Integer, String>> input2 = 
new UnionIterator<>(inList2);
-                       
-                       
-                       // collect expected data
-                       final Map<Integer, Collection<TupleMatch>> 
expectedMatchesMap = matchSecondTupleFields(
-                               collectTupleData(input1),
-                               collectTupleData(input2));
-                       
-                       // re-create the whole thing for actual processing
-                       
-                       // reset the generators and iterators
-                       generator1.reset();
-                       generator2.reset();
-                       const1Iter.reset();
-                       const2Iter.reset();
-                       gen1Iter.reset();
-                       gen2Iter.reset();
-                       
-                       inList1.clear();
-                       inList1.add(gen1Iter);
-                       inList1.add(const1Iter);
-                       
-                       inList2.clear();
-                       inList2.add(gen2Iter);
-                       inList2.add(const2Iter);
-       
-                       input1 = new UnionIterator<>(inList1);
-                       input2 = new UnionIterator<>(inList2);
-                       
-                       final FlatJoinFunction matcher = new 
TupleMatchRemovingJoin(expectedMatchesMap);
-                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
-       
-                       NonReusingBuildFirstHashMatchIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-                                       new 
NonReusingBuildFirstHashMatchIterator<>(
-                                               input1, input2, 
this.recordSerializer, this.record1Comparator, 
-                                               this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
-                                               this.memoryManager, ioManager, 
this.parentTask, 1.0, true);
-
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<Integer, Collection<TupleMatch>> entry : 
expectedMatchesMap.entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBuildSecond() {
-               try {
-                       TupleGenerator generator1 = new TupleGenerator(SEED1, 
500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-                       TupleGenerator generator2 = new TupleGenerator(SEED2, 
500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-                       
-                       final TestData.TupleGeneratorIterator input1 = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       // collect expected data
-                       final Map<Integer, Collection<TupleMatch>> 
expectedMatchesMap = matchSecondTupleFields(
-                               collectTupleData(input1),
-                               collectTupleData(input2));
-                       
-                       final FlatJoinFunction matcher = new 
TupleMatchRemovingJoin(expectedMatchesMap);
-                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
-       
-                       // reset the generators
-                       generator1.reset();
-                       generator2.reset();
-                       input1.reset();
-                       input2.reset();
-       
-                       // compare with iterator values                 
-                       NonReusingBuildSecondHashMatchIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-                               new NonReusingBuildSecondHashMatchIterator<>(
-                                       input1, input2, this.recordSerializer, 
this.record1Comparator, 
-                                       this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
-                                       this.memoryManager, ioManager, 
this.parentTask, 1.0, true);
-
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<Integer, Collection<TupleMatch>> entry : 
expectedMatchesMap.entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBuildSecondWithHighNumberOfCommonKeys()
-       {
-               // the size of the left and right inputs
-               final int INPUT_1_SIZE = 200;
-               final int INPUT_2_SIZE = 100;
-               
-               final int INPUT_1_DUPLICATES = 10;
-               final int INPUT_2_DUPLICATES = 2000;
-               final int DUPLICATE_KEY = 13;
-               
-               try {
-                       TupleGenerator generator1 = new TupleGenerator(SEED1, 
500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-                       TupleGenerator generator2 = new TupleGenerator(SEED2, 
500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-                       
-                       final TestData.TupleGeneratorIterator gen1Iter = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.TupleGeneratorIterator gen2Iter = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       final TestData.TupleConstantValueIterator const1Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for 
Duplicate Keys", INPUT_1_DUPLICATES);
-                       final TestData.TupleConstantValueIterator const2Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for 
Duplicate Keys", INPUT_2_DUPLICATES);
-                       
-                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList1 = new ArrayList<>();
-                       inList1.add(gen1Iter);
-                       inList1.add(const1Iter);
-                       
-                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList2 = new ArrayList<>();
-                       inList2.add(gen2Iter);
-                       inList2.add(const2Iter);
-                       
-                       MutableObjectIterator<Tuple2<Integer, String>> input1 = 
new UnionIterator<>(inList1);
-                       MutableObjectIterator<Tuple2<Integer, String>> input2 = 
new UnionIterator<>(inList2);
-                       
-                       
-                       // collect expected data
-                       final Map<Integer, Collection<TupleMatch>> 
expectedMatchesMap = matchSecondTupleFields(
-                               collectTupleData(input1),
-                               collectTupleData(input2));
-                       
-                       // re-create the whole thing for actual processing
-                       
-                       // reset the generators and iterators
-                       generator1.reset();
-                       generator2.reset();
-                       const1Iter.reset();
-                       const2Iter.reset();
-                       gen1Iter.reset();
-                       gen2Iter.reset();
-                       
-                       inList1.clear();
-                       inList1.add(gen1Iter);
-                       inList1.add(const1Iter);
-                       
-                       inList2.clear();
-                       inList2.add(gen2Iter);
-                       inList2.add(const2Iter);
-       
-                       input1 = new UnionIterator<>(inList1);
-                       input2 = new UnionIterator<>(inList2);
-                       
-                       final FlatJoinFunction matcher = new 
TupleMatchRemovingJoin(expectedMatchesMap);
-                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
-
-                       NonReusingBuildSecondHashMatchIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-                               new NonReusingBuildSecondHashMatchIterator<>(
-                                       input1, input2, this.recordSerializer, 
this.record1Comparator, 
-                                       this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
-                                       this.memoryManager, ioManager, 
this.parentTask, 1.0, true);
-                       
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<Integer, Collection<TupleMatch>> entry : 
expectedMatchesMap.entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBuildFirstWithMixedDataTypes() {
-               try {
-                       MutableObjectIterator<IntPair> input1 = new 
UniformIntPairGenerator(500, 40, false);
-                       
-                       final TupleGenerator generator2 = new 
TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       // collect expected data
-                       final Map<Integer, Collection<TupleIntPairMatch>> 
expectedMatchesMap = matchTupleIntPairValues(
-                               collectIntPairData(input1),
-                               collectTupleData(input2));
-                       
-                       final FlatJoinFunction<IntPair, Tuple2<Integer, 
String>, Tuple2<Integer, String>> matcher = new 
TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
-                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<Tuple2<Integer, String>>();
-       
-                       // reset the generators
-                       input1 = new UniformIntPairGenerator(500, 40, false);
-                       generator2.reset();
-                       input2.reset();
-       
-                       // compare with iterator values
-                       NonReusingBuildSecondHashMatchIterator<IntPair, 
Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-                                       new 
NonReusingBuildSecondHashMatchIterator<>(
-                                               input1, input2, 
this.pairSerializer, this.pairComparator,
-                                               this.recordSerializer, 
this.record2Comparator, this.pairRecordPairComparator,
-                                               this.memoryManager, 
this.ioManager, this.parentTask, 1.0, true);
-                       
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<Integer, Collection<TupleIntPairMatch>> 
entry : expectedMatchesMap.entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBuildSecondWithMixedDataTypes() {
-               try {
-                       MutableObjectIterator<IntPair> input1 = new 
UniformIntPairGenerator(500, 40, false);
-                       
-                       final TupleGenerator generator2 = new 
TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       // collect expected data
-                       final Map<Integer, Collection<TupleIntPairMatch>> 
expectedMatchesMap = matchTupleIntPairValues(
-                               collectIntPairData(input1),
-                               collectTupleData(input2));
-                       
-                       final FlatJoinFunction<IntPair, Tuple2<Integer, 
String>, Tuple2<Integer, String>> matcher = new 
TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
-                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
-       
-                       // reset the generators
-                       input1 = new UniformIntPairGenerator(500, 40, false);
-                       generator2.reset();
-                       input2.reset();
-       
-                       // compare with iterator values
-                       NonReusingBuildFirstHashMatchIterator<IntPair, 
Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-                                       new 
NonReusingBuildFirstHashMatchIterator<>(
-                                               input1, input2, 
this.pairSerializer, this.pairComparator, 
-                                               this.recordSerializer, 
this.record2Comparator, this.recordPairPairComparator,
-                                               this.memoryManager, 
this.ioManager, this.parentTask, 1.0, true);
-                       
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<Integer, Collection<TupleIntPairMatch>> 
entry : expectedMatchesMap.entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                                    Utilities
-       // 
--------------------------------------------------------------------------------------------
-
-       
-       
-       static Map<Integer, Collection<TupleMatch>> matchSecondTupleFields(
-                       Map<Integer, Collection<String>> leftMap,
-                       Map<Integer, Collection<String>> rightMap)
-       {
-               Map<Integer, Collection<TupleMatch>> map = new HashMap<>();
-
-               for (Integer key : leftMap.keySet()) {
-                       Collection<String> leftValues = leftMap.get(key);
-                       Collection<String> rightValues = rightMap.get(key);
-
-                       if (rightValues == null) {
-                               continue;
-                       }
-
-                       if (!map.containsKey(key)) {
-                               map.put(key, new ArrayList<TupleMatch>());
-                       }
-
-                       Collection<TupleMatch> matchedValues = map.get(key);
-
-                       for (String leftValue : leftValues) {
-                               for (String rightValue : rightValues) {
-                                       matchedValues.add(new 
TupleMatch(leftValue, rightValue));
-                               }
-                       }
-               }
-
-               return map;
-       }
-       
-       static Map<Integer, Collection<TupleIntPairMatch>> 
matchTupleIntPairValues(
-               Map<Integer, Collection<Integer>> leftMap,
-               Map<Integer, Collection<String>> rightMap)
-       {
-               final Map<Integer, Collection<TupleIntPairMatch>> map = new 
HashMap<>();
-       
-               for (Integer i : leftMap.keySet()) {
-                       
-                       final Collection<Integer> leftValues = leftMap.get(i);
-                       final Collection<String> rightValues = rightMap.get(i);
-       
-                       if (rightValues == null) {
-                               continue;
-                       }
-       
-                       if (!map.containsKey(i)) {
-                               map.put(i, new ArrayList<TupleIntPairMatch>());
-                       }
-       
-                       final Collection<TupleIntPairMatch> matchedValues = 
map.get(i);
-       
-                       for (Integer v : leftValues) {
-                               for (String val : rightValues) {
-                                       matchedValues.add(new 
TupleIntPairMatch(v, val));
-                               }
-                       }
-               }
-       
-               return map;
-       }
-
-       
-       static Map<Integer, Collection<String>> 
collectTupleData(MutableObjectIterator<Tuple2<Integer, String>> iter)
-       throws Exception
-       {
-               Map<Integer, Collection<String>> map = new HashMap<>();
-               Tuple2<Integer, String> pair = new Tuple2<>();
-               
-               while ((pair = iter.next(pair)) != null) {
-
-                       Integer key = pair.f0;
-                       if (!map.containsKey(key)) {
-                               map.put(key, new ArrayList<String>());
-                       }
-
-                       Collection<String> values = map.get(key);
-                       values.add(pair.f1);
-               }
-
-               return map;
-       }
-       
-       static Map<Integer, Collection<Integer>> 
collectIntPairData(MutableObjectIterator<IntPair> iter)
-       throws Exception
-       {
-               Map<Integer, Collection<Integer>> map = new HashMap<Integer, 
Collection<Integer>>();
-               IntPair pair = new IntPair();
-               
-               while ((pair = iter.next(pair)) != null) {
-
-                       final int key = pair.getKey();
-                       final int value = pair.getValue();
-                       if (!map.containsKey(key)) {
-                               map.put(key, new ArrayList<Integer>());
-                       }
-
-                       Collection<Integer> values = map.get(key);
-                       values.add(value);
-               }
-
-               return map;
-       }
-
-       /**
-        * Private class used for storage of the expected matches in a hash-map.
-        */
-       static class TupleMatch {
-               
-               private final String left;
-               private final String right;
-
-               public TupleMatch(String left, String right) {
-                       this.left = left;
-                       this.right = right;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       TupleMatch o = (TupleMatch) obj;
-                       return this.left.equals(o.left) && 
this.right.equals(o.right);
-               }
-               
-               @Override
-               public int hashCode() {
-                       return this.left.hashCode() ^ this.right.hashCode();
-               }
-
-               @Override
-               public String toString() {
-                       return left + ", " + right;
-               }
-       }
-       
-       /**
-        * Private class used for storage of the expected matches in a hash-map.
-        */
-       static class TupleIntPairMatch
-       {
-               private final int left;
-               private final String right;
-
-               public TupleIntPairMatch(int left, String right) {
-                       this.left = left;
-                       this.right = new String(right);
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       TupleIntPairMatch o = (TupleIntPairMatch) obj;
-                       return this.left == o.left && 
this.right.equals(o.right);
-               }
-               
-               @Override
-               public int hashCode() {
-                       return this.left ^ this.right.hashCode();
-               }
-
-               @Override
-               public String toString() {
-                       return left + ", " + right;
-               }
-       }
-       
-       static final class TupleMatchRemovingJoin implements 
FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, 
Tuple2<Integer, String>>
-       {
-               private final Map<Integer, Collection<TupleMatch>> toRemoveFrom;
-               
-               protected TupleMatchRemovingJoin(Map<Integer, 
Collection<TupleMatch>> map) {
-                       this.toRemoveFrom = map;
-               }
-               
-               @Override
-               public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, 
String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
-               {
-                       int key = rec1.f0;
-                       String value1 = rec1.f1;
-                       String value2 = rec2.f1;
-                       //System.err.println("rec1 key = "+key+"  rec2 key= 
"+rec2.f0);
-                       Collection<TupleMatch> matches = 
this.toRemoveFrom.get(key);
-                       if (matches == null) {
-                               Assert.fail("Match " + key + " - " + value1 + 
":" + value2 + " is unexpected.");
-                       }
-                       
-                       Assert.assertTrue("Produced match was not contained: " 
+ key + " - " + value1 + ":" + value2,
-                               matches.remove(new TupleMatch(value1, value2)));
-                       
-                       if (matches.isEmpty()) {
-                               this.toRemoveFrom.remove(key);
-                       }
-               }
-       }
-       
-       static final class TupleIntPairMatchRemovingMatcher implements 
FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>>
-       {
-               private final Map<Integer, Collection<TupleIntPairMatch>> 
toRemoveFrom;
-               
-               protected TupleIntPairMatchRemovingMatcher(Map<Integer, 
Collection<TupleIntPairMatch>> map) {
-                       this.toRemoveFrom = map;
-               }
-               
-               @Override
-               public void join(IntPair rec1, Tuple2<Integer, String> rec2, 
Collector<Tuple2<Integer, String>> out) throws Exception
-               {
-                       final int k = rec1.getKey();
-                       final int v = rec1.getValue(); 
-                       
-                       final Integer key = rec2.f0;
-                       final String value = rec2.f1;
-
-                       Assert.assertTrue("Key does not match for matching 
IntPair Tuple combination.", k == key);
-                       
-                       Collection<TupleIntPairMatch> matches = 
this.toRemoveFrom.get(key);
-                       if (matches == null) {
-                               Assert.fail("Match " + key + " - " + v + ":" + 
value + " is unexpected.");
-                       }
-                       
-                       Assert.assertTrue("Produced match was not contained: " 
+ key + " - " + v + ":" + value,
-                               matches.remove(new TupleIntPairMatch(v, 
value)));
-                       
-                       if (matches.isEmpty()) {
-                               this.toRemoveFrom.remove(key);
-                       }
-               }
-       }
-       
-       static final class IntPairTuplePairComparator extends 
TypePairComparator<IntPair, Tuple2<Integer, String>>
-       {
-               private int reference;
-               
-               @Override
-               public void setReference(IntPair reference) {
-                       this.reference = reference.getKey();    
-               }
-
-               @Override
-               public boolean equalToReference(Tuple2<Integer, String> 
candidate) {
-                       try {
-                               return candidate.f0 == this.reference;
-                       } catch (NullPointerException npex) {
-                               throw new NullKeyFieldException();
-                       }
-               }
-
-               @Override
-               public int compareToReference(Tuple2<Integer, String> 
candidate) {
-                       try {
-                               return candidate.f0 - this.reference;
-                       } catch (NullPointerException npex) {
-                               throw new NullKeyFieldException();
-                       }
-               }
-       }
-       
-       static final class TupleIntPairPairComparator extends 
TypePairComparator<Tuple2<Integer, String>, IntPair>
-       {
-               private int reference;
-               
-               @Override
-               public void setReference(Tuple2<Integer, String> reference) {
-                       this.reference = reference.f0;
-               }
-
-               @Override
-               public boolean equalToReference(IntPair candidate) {
-                       return this.reference == candidate.getKey();
-               }
-
-               @Override
-               public int compareToReference(IntPair candidate) {
-                       return candidate.getKey() - this.reference;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index 5a4fc6a..a885e6b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -30,8 +30,8 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import 
org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
-import 
org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.TupleMatch;
-import 
org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.TupleMatchRemovingJoin;
+import 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
+import 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
@@ -204,7 +204,7 @@ public class NonReusingReOpenableHashTableITCase {
 
        private void doTest(TestData.TupleGeneratorIterator buildInput, 
TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator 
pgen) throws Exception {
                // collect expected data
-               final Map<Integer, Collection<TupleMatch>> 
expectedFirstMatchesMap = 
NonReusingHashMatchIteratorITCase.matchSecondTupleFields(NonReusingHashMatchIteratorITCase.collectTupleData(buildInput),
 NonReusingHashMatchIteratorITCase.collectTupleData(probeInput));
+               final Map<Integer, Collection<TupleMatch>> 
expectedFirstMatchesMap = 
NonReusingHashJoinIteratorITCase.joinTuples(NonReusingHashJoinIteratorITCase.collectTupleData(buildInput),
 NonReusingHashJoinIteratorITCase.collectTupleData(probeInput));
 
                final List<Map<Integer, Collection<TupleMatch>>> 
expectedNMatchesMapList = new ArrayList<>(NUM_PROBES);
                final FlatJoinFunction[] nMatcher = new 
TupleMatchRemovingJoin[NUM_PROBES];
@@ -225,11 +225,11 @@ public class NonReusingReOpenableHashTableITCase {
                probeInput.reset();
 
                // compare with iterator values
-               NonReusingBuildFirstReOpenableHashMatchIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-                               new 
NonReusingBuildFirstReOpenableHashMatchIterator<>(
+               NonReusingBuildFirstReOpenableHashJoinIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                               new 
NonReusingBuildFirstReOpenableHashJoinIterator<>(
                                                buildInput, probeInput, 
this.recordSerializer, this.record1Comparator,
                                        this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
-                                       this.memoryManager, ioManager, 
this.parentTask, 1.0, true);
+                                       this.memoryManager, ioManager, 
this.parentTask, 1.0, false, true);
 
                iterator.open();
                // do first join with both inputs

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
new file mode 100644
index 0000000..87707a4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
@@ -0,0 +1,709 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
+import 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleIntPairMatch;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
+import org.apache.flink.runtime.operators.testutils.UnionIterator;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.types.IntPair;
+import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples;
+import static 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.leftOuterJoinTuples;
+import static 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.rightOuterJoinTuples;
+import static 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinIntPairs;
+import static 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectTupleData;
+import static 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectIntPairData;
+
+@SuppressWarnings({"serial", "deprecation"})
+public class ReusingHashJoinIteratorITCase {
+       
+       private static final int MEMORY_SIZE = 16000000;                // 
total memory
+
+       private static final int INPUT_1_SIZE = 20000;
+       private static final int INPUT_2_SIZE = 1000;
+
+       private static final long SEED1 = 561349061987311L;
+       private static final long SEED2 = 231434613412342L;
+       
+       private final AbstractInvokable parentTask = new DummyInvokable();
+
+       private IOManager ioManager;
+       private MemoryManager memoryManager;
+       
+       private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+       private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+       private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+       private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, 
String>> recordPairComparator;
+       
+       private TypeSerializer<IntPair> pairSerializer;
+       private TypeComparator<IntPair> pairComparator;
+       private TypePairComparator<IntPair, Tuple2<Integer, String>> 
pairRecordPairComparator;
+       private TypePairComparator<Tuple2<Integer, String>, IntPair> 
recordPairPairComparator;
+
+
+       @SuppressWarnings("unchecked")
+       @Before
+       public void beforeTest() {
+               this.recordSerializer = TestData.getIntStringTupleSerializer();
+               
+               this.record1Comparator = TestData.getIntStringTupleComparator();
+               this.record2Comparator = TestData.getIntStringTupleComparator();
+               
+               this.recordPairComparator = new 
GenericPairComparator(this.record1Comparator, this.record2Comparator);
+               
+               this.pairSerializer = new IntPairSerializer();
+               this.pairComparator = new TestData.IntPairComparator();
+               this.pairRecordPairComparator = new 
IntPairTuplePairComparator();
+               this.recordPairPairComparator = new 
TupleIntPairPairComparator();
+               
+               this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
+               this.ioManager = new IOManagerAsync();
+       }
+
+       @After
+       public void afterTest() {
+               if (this.ioManager != null) {
+                       this.ioManager.shutdown();
+                       if (!this.ioManager.isProperlyShutDown()) {
+                               Assert.fail("I/O manager failed to properly 
shut down.");
+                       }
+                       this.ioManager = null;
+               }
+               
+               if (this.memoryManager != null) {
+                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
+                               this.memoryManager.verifyEmpty());
+                       this.memoryManager.shutdown();
+                       this.memoryManager = null;
+               }
+       }
+
+
+       @Test
+       public void testBuildFirst() {
+               try {
+                       TestData.TupleGenerator generator1 = new 
TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       
+                       final TestData.TupleGeneratorIterator input1 = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+                       
+                       // collect expected data
+                       final Map<Integer, Collection<TupleMatch>> 
expectedMatchesMap = joinTuples(
+                                       collectTupleData(input1),
+                                       collectTupleData(input2));
+                       
+                       final FlatJoinFunction matcher = new 
TupleMatchRemovingJoin(expectedMatchesMap);
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
+       
+                       // reset the generators
+                       generator1.reset();
+                       generator2.reset();
+                       input1.reset();
+                       input2.reset();
+       
+                       // compare with iterator values
+                       ReusingBuildFirstHashJoinIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                                       new ReusingBuildFirstHashJoinIterator<>(
+                                               input1, input2, 
this.recordSerializer, this.record1Comparator, 
+                                               this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
+                                               this.memoryManager, ioManager, 
this.parentTask, 1.0, false, true);
+                       
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(matcher, collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<Integer, Collection<TupleMatch>> entry : 
expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testBuildFirstWithHighNumberOfCommonKeys()
+       {
+               // the size of the left and right inputs
+               final int INPUT_1_SIZE = 200;
+               final int INPUT_2_SIZE = 100;
+               
+               final int INPUT_1_DUPLICATES = 10;
+               final int INPUT_2_DUPLICATES = 2000;
+               final int DUPLICATE_KEY = 13;
+               
+               try {
+                       TestData.TupleGenerator generator1 = new 
TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       
+                       final TestData.TupleGeneratorIterator gen1Iter = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.TupleGeneratorIterator gen2Iter = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+                       
+                       final TestData.TupleConstantValueIterator const1Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for 
Duplicate Keys", INPUT_1_DUPLICATES);
+                       final TestData.TupleConstantValueIterator const2Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for 
Duplicate Keys", INPUT_2_DUPLICATES);
+                       
+                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList1 = new ArrayList<>();
+                       inList1.add(gen1Iter);
+                       inList1.add(const1Iter);
+                       
+                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList2 = new ArrayList<>();
+                       inList2.add(gen2Iter);
+                       inList2.add(const2Iter);
+                       
+                       MutableObjectIterator<Tuple2<Integer, String>> input1 = 
new UnionIterator<>(inList1);
+                       MutableObjectIterator<Tuple2<Integer, String>> input2 = 
new UnionIterator<>(inList2);
+                       
+                       
+                       // collect expected data
+                       final Map<Integer, Collection<TupleMatch>> 
expectedMatchesMap = joinTuples(
+                                       collectTupleData(input1),
+                                       collectTupleData(input2));
+                       
+                       // re-create the whole thing for actual processing
+                       
+                       // reset the generators and iterators
+                       generator1.reset();
+                       generator2.reset();
+                       const1Iter.reset();
+                       const2Iter.reset();
+                       gen1Iter.reset();
+                       gen2Iter.reset();
+                       
+                       inList1.clear();
+                       inList1.add(gen1Iter);
+                       inList1.add(const1Iter);
+                       
+                       inList2.clear();
+                       inList2.add(gen2Iter);
+                       inList2.add(const2Iter);
+       
+                       input1 = new UnionIterator<>(inList1);
+                       input2 = new UnionIterator<>(inList2);
+                       
+                       final FlatJoinFunction matcher = new 
TupleMatchRemovingJoin(expectedMatchesMap);
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
+       
+                       ReusingBuildFirstHashJoinIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                                       new ReusingBuildFirstHashJoinIterator<>(
+                                               input1, input2, 
this.recordSerializer, this.record1Comparator, 
+                                               this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
+                                               this.memoryManager, ioManager, 
this.parentTask, 1.0, false, true);
+
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(matcher, collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<Integer, Collection<TupleMatch>> entry : 
expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testBuildSecond() {
+               try {
+                       TestData.TupleGenerator generator1 = new 
TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       
+                       final TestData.TupleGeneratorIterator input1 = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+                       
+                       // collect expected data
+                       final Map<Integer, Collection<TupleMatch>> 
expectedMatchesMap = joinTuples(
+                                       collectTupleData(input1),
+                                       collectTupleData(input2));
+                       
+                       final FlatJoinFunction matcher = new 
TupleMatchRemovingJoin(expectedMatchesMap);
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
+       
+                       // reset the generators
+                       generator1.reset();
+                       generator2.reset();
+                       input1.reset();
+                       input2.reset();
+       
+                       // compare with iterator values                 
+                       ReusingBuildSecondHashJoinIterator<Tuple2<Integer, 
String>,Tuple2<Integer, String> ,Tuple2<Integer, String> > iterator =
+                               new ReusingBuildSecondHashJoinIterator<>(
+                                       input1, input2, this.recordSerializer, 
this.record1Comparator, 
+                                       this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
+                                       this.memoryManager, ioManager, 
this.parentTask, 1.0, false, true);
+
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(matcher, collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<Integer, Collection<TupleMatch>> entry : 
expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testBuildSecondWithHighNumberOfCommonKeys()
+       {
+               // the size of the left and right inputs
+               final int INPUT_1_SIZE = 200;
+               final int INPUT_2_SIZE = 100;
+               
+               final int INPUT_1_DUPLICATES = 10;
+               final int INPUT_2_DUPLICATES = 2000;
+               final int DUPLICATE_KEY = 13;
+               
+               try {
+                       TestData.TupleGenerator generator1 = new 
TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       
+                       final TestData.TupleGeneratorIterator gen1Iter = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.TupleGeneratorIterator gen2Iter = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+                       
+                       final TestData.TupleConstantValueIterator const1Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for 
Duplicate Keys", INPUT_1_DUPLICATES);
+                       final TestData.TupleConstantValueIterator const2Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for 
Duplicate Keys", INPUT_2_DUPLICATES);
+                       
+                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList1 = new ArrayList<>();
+                       inList1.add(gen1Iter);
+                       inList1.add(const1Iter);
+                       
+                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList2 = new ArrayList<>();
+                       inList2.add(gen2Iter);
+                       inList2.add(const2Iter);
+                       
+                       MutableObjectIterator<Tuple2<Integer, String>> input1 = 
new UnionIterator<>(inList1);
+                       MutableObjectIterator<Tuple2<Integer, String>> input2 = 
new UnionIterator<>(inList2);
+                       
+                       
+                       // collect expected data
+                       final Map<Integer, Collection<TupleMatch>> 
expectedMatchesMap = joinTuples(
+                                       collectTupleData(input1),
+                                       collectTupleData(input2));
+                       
+                       // re-create the whole thing for actual processing
+                       
+                       // reset the generators and iterators
+                       generator1.reset();
+                       generator2.reset();
+                       const1Iter.reset();
+                       const2Iter.reset();
+                       gen1Iter.reset();
+                       gen2Iter.reset();
+                       
+                       inList1.clear();
+                       inList1.add(gen1Iter);
+                       inList1.add(const1Iter);
+                       
+                       inList2.clear();
+                       inList2.add(gen2Iter);
+                       inList2.add(const2Iter);
+       
+                       input1 = new UnionIterator<>(inList1);
+                       input2 = new UnionIterator<>(inList2);
+                       
+                       final FlatJoinFunction matcher = new 
TupleMatchRemovingJoin(expectedMatchesMap);
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
+       
+                       ReusingBuildSecondHashJoinIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                               new ReusingBuildSecondHashJoinIterator<>(
+                                       input1, input2, this.recordSerializer, 
this.record1Comparator, 
+                                       this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
+                                       this.memoryManager, ioManager, 
this.parentTask, 1.0, false, true);
+                       
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(matcher, collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<Integer, Collection<TupleMatch>> entry : 
expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testBuildFirstWithMixedDataTypes() {
+               try {
+                       MutableObjectIterator<IntPair> input1 = new 
UniformIntPairGenerator(500, 40, false);
+                       
+                       final TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+                       
+                       // collect expected data
+                       final Map<Integer, Collection<TupleIntPairMatch>> 
expectedMatchesMap = joinIntPairs(
+                                       collectIntPairData(input1),
+                                       collectTupleData(input2));
+                       
+                       final FlatJoinFunction<IntPair, Tuple2<Integer, 
String>, Tuple2<Integer, String>> matcher = new 
TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
+       
+                       // reset the generators
+                       input1 = new UniformIntPairGenerator(500, 40, false);
+                       generator2.reset();
+                       input2.reset();
+       
+                       // compare with iterator values
+                       ReusingBuildSecondHashJoinIterator<IntPair, 
Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                                       new 
ReusingBuildSecondHashJoinIterator<>(
+                                               input1, input2, 
this.pairSerializer, this.pairComparator,
+                                               this.recordSerializer, 
this.record2Comparator, this.pairRecordPairComparator,
+                                               this.memoryManager, 
this.ioManager, this.parentTask, 1.0, false, true);
+                       
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(matcher, collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<Integer, Collection<TupleIntPairMatch>> 
entry : expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testBuildSecondWithMixedDataTypes() {
+               try {
+                       MutableObjectIterator<IntPair> input1 = new 
UniformIntPairGenerator(500, 40, false);
+                       
+                       final TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+                       
+                       // collect expected data
+                       final Map<Integer, Collection<TupleIntPairMatch>> 
expectedMatchesMap = joinIntPairs(
+                                       collectIntPairData(input1),
+                                       collectTupleData(input2));
+                       
+                       final FlatJoinFunction<IntPair, Tuple2<Integer, 
String>, Tuple2<Integer, String>> matcher = new 
TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
+       
+                       // reset the generators
+                       input1 = new UniformIntPairGenerator(500, 40, false);
+                       generator2.reset();
+                       input2.reset();
+       
+                       // compare with iterator values
+                       ReusingBuildFirstHashJoinIterator<IntPair, 
Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                                       new ReusingBuildFirstHashJoinIterator<>(
+                                               input1, input2, 
this.pairSerializer, this.pairComparator, 
+                                               this.recordSerializer, 
this.record2Comparator, this.recordPairPairComparator,
+                                               this.memoryManager, 
this.ioManager, this.parentTask, 1.0, false, true);
+                       
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(matcher, collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<Integer, Collection<TupleIntPairMatch>> 
entry : expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+
+       @Test
+       public void testBuildFirstJoinWithEmptyBuild() {
+               try {
+                       TestData.TupleGenerator generator1 = new 
TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+
+                       final TestData.TupleGeneratorIterator input1 = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+                       // collect expected data
+                       final Map<Integer, Collection<TupleMatch>> 
expectedMatchesMap = rightOuterJoinTuples(
+                                       collectTupleData(input1),
+                                       collectTupleData(input2));
+
+                       final FlatJoinFunction matcher = new 
TupleMatchRemovingJoin(expectedMatchesMap);
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
+
+                       // reset the generators
+                       generator1.reset();
+                       generator2.reset();
+                       input1.reset();
+                       input2.reset();
+
+                       // compare with iterator values
+                       ReusingBuildFirstHashJoinIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                                       new ReusingBuildFirstHashJoinIterator<>(
+                                                       input1, input2, 
this.recordSerializer, this.record1Comparator,
+                                                       this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
+                                                       this.memoryManager, 
ioManager, this.parentTask, 1.0, true, false);
+
+                       iterator.open();
+
+                       while (iterator.callWithNextKey(matcher, collector));
+
+                       iterator.close();
+
+                       // assert that each expected match was seen
+                       for (Entry<Integer, Collection<TupleMatch>> entry : 
expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+
+       @Test
+       public void testBuildSecondJoinWithEmptyBuild() {
+               try {
+                       TestData.TupleGenerator generator1 = new 
TestData.TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+
+                       final TestData.TupleGeneratorIterator input1 = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+                       // collect expected data
+                       final Map<Integer, Collection<TupleMatch>> 
expectedMatchesMap = leftOuterJoinTuples(
+                                       collectTupleData(input1),
+                                       collectTupleData(input2));
+
+                       final FlatJoinFunction matcher = new 
TupleMatchRemovingJoin(expectedMatchesMap);
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
+
+                       // reset the generators
+                       generator1.reset();
+                       generator2.reset();
+                       input1.reset();
+                       input2.reset();
+
+                       // compare with iterator values
+                       ReusingBuildSecondHashJoinIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                                       new 
ReusingBuildSecondHashJoinIterator<>(
+                                                       input1, input2, 
this.recordSerializer, this.record1Comparator,
+                                                       this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
+                                                       this.memoryManager, 
ioManager, this.parentTask, 1.0, true, false);
+
+                       iterator.open();
+
+                       while (iterator.callWithNextKey(matcher, collector));
+
+                       iterator.close();
+
+                       // assert that each expected match was seen
+                       for (Entry<Integer, Collection<TupleMatch>> entry : 
expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                                    Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+
+       static final class TupleMatchRemovingJoin implements 
FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, 
Tuple2<Integer, String>>
+       {
+               private final Map<Integer, Collection<TupleMatch>> toRemoveFrom;
+               
+               protected TupleMatchRemovingJoin(Map<Integer, 
Collection<TupleMatch>> map) {
+                       this.toRemoveFrom = map;
+               }
+               
+               @Override
+               public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, 
String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
+               {
+                       Integer key = rec1 != null ? rec1.f0 : rec2.f0;
+                       String value1 = rec1 != null ? rec1.f1 : null;
+                       String value2 = rec2 != null ? rec2.f1 : null;
+                       //System.err.println("rec1 key = "+key+"  rec2 key= 
"+rec2.getField(0, TestData.Key.class));
+                       Collection<TupleMatch> matches = 
this.toRemoveFrom.get(key);
+                       if (matches == null) {
+                               Assert.fail("Match " + key + " - " + value1 + 
":" + value2 + " is unexpected.");
+                       }
+                       
+                       Assert.assertTrue("Produced match was not contained: " 
+ key + " - " + value1 + ":" + value2,
+                               matches.remove(new TupleMatch(value1, value2)));
+                       
+                       if (matches.isEmpty()) {
+                               this.toRemoveFrom.remove(key);
+                       }
+               }
+       }
+       
+       static final class TupleIntPairMatchRemovingMatcher extends 
AbstractRichFunction implements FlatJoinFunction<IntPair, Tuple2<Integer, 
String>, Tuple2<Integer, String>>
+       {
+               private final Map<Integer, Collection<TupleIntPairMatch>> 
toRemoveFrom;
+               
+               protected TupleIntPairMatchRemovingMatcher(Map<Integer, 
Collection<TupleIntPairMatch>> map) {
+                       this.toRemoveFrom = map;
+               }
+               
+               @Override
+               public void join(IntPair rec1, Tuple2<Integer, String> rec2, 
Collector<Tuple2<Integer, String>> out) throws Exception
+               {
+                       final int k = rec1.getKey();
+                       final int v = rec1.getValue(); 
+                       
+                       final Integer key = rec2.f0;
+                       final String value = rec2.f1;
+                       
+                       Assert.assertTrue("Key does not match for matching 
IntPair Tuple combination.", k == key); 
+                       
+                       Collection<TupleIntPairMatch> matches = 
this.toRemoveFrom.get(key);
+                       if (matches == null) {
+                               Assert.fail("Match " + key + " - " + v + ":" + 
value + " is unexpected.");
+                       }
+                       
+                       Assert.assertTrue("Produced match was not contained: " 
+ key + " - " + v + ":" + value,
+                               matches.remove(new TupleIntPairMatch(v, 
value)));
+                       
+                       if (matches.isEmpty()) {
+                               this.toRemoveFrom.remove(key);
+                       }
+               }
+       }
+       
+       static final class IntPairTuplePairComparator extends 
TypePairComparator<IntPair, Tuple2<Integer, String>>
+       {
+               private int reference;
+               
+               @Override
+               public void setReference(IntPair reference) {
+                       this.reference = reference.getKey();    
+               }
+
+               @Override
+               public boolean equalToReference(Tuple2<Integer, String> 
candidate) {
+                       try {
+                               return candidate.f0 == this.reference;
+                       } catch (NullPointerException npex) {
+                               throw new NullKeyFieldException();
+                       }
+               }
+
+               @Override
+               public int compareToReference(Tuple2<Integer, String> 
candidate) {
+                       try {
+                               return candidate.f0 - this.reference;
+                       } catch (NullPointerException npex) {
+                               throw new NullKeyFieldException();
+                       }
+               }
+       }
+       
+       static final class TupleIntPairPairComparator extends 
TypePairComparator<Tuple2<Integer, String>, IntPair>
+       {
+               private int reference;
+               
+               @Override
+               public void setReference(Tuple2<Integer, String> reference) {
+                       this.reference = reference.f0;
+               }
+
+               @Override
+               public boolean equalToReference(IntPair candidate) {
+                       return this.reference == candidate.getKey();
+               }
+
+               @Override
+               public int compareToReference(IntPair candidate) {
+                       return candidate.getKey() - this.reference;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
deleted file mode 100644
index 12f4a32..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
+++ /dev/null
@@ -1,768 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
-import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
-import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.types.IntPair;
-import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
-import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-@SuppressWarnings({"serial", "deprecation"})
-public class ReusingHashMatchIteratorITCase {
-       
-       private static final int MEMORY_SIZE = 16000000;                // 
total memory
-
-       private static final int INPUT_1_SIZE = 20000;
-       private static final int INPUT_2_SIZE = 1000;
-
-       private static final long SEED1 = 561349061987311L;
-       private static final long SEED2 = 231434613412342L;
-       
-       private final AbstractInvokable parentTask = new DummyInvokable();
-
-       private IOManager ioManager;
-       private MemoryManager memoryManager;
-       
-       private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
-       private TypeComparator<Tuple2<Integer, String>> record1Comparator;
-       private TypeComparator<Tuple2<Integer, String>> record2Comparator;
-       private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, 
String>> recordPairComparator;
-       
-       private TypeSerializer<IntPair> pairSerializer;
-       private TypeComparator<IntPair> pairComparator;
-       private TypePairComparator<IntPair, Tuple2<Integer, String>> 
pairRecordPairComparator;
-       private TypePairComparator<Tuple2<Integer, String>, IntPair> 
recordPairPairComparator;
-
-
-       @SuppressWarnings("unchecked")
-       @Before
-       public void beforeTest() {
-               this.recordSerializer = TestData.getIntStringTupleSerializer();
-               
-               this.record1Comparator = TestData.getIntStringTupleComparator();
-               this.record2Comparator = TestData.getIntStringTupleComparator();
-               
-               this.recordPairComparator = new 
GenericPairComparator(this.record1Comparator, this.record2Comparator);
-               
-               this.pairSerializer = new IntPairSerializer();
-               this.pairComparator = new TestData.IntPairComparator();
-               this.pairRecordPairComparator = new 
IntPairTuplePairComparator();
-               this.recordPairPairComparator = new 
TupleIntPairPairComparator();
-               
-               this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
-               this.ioManager = new IOManagerAsync();
-       }
-
-       @After
-       public void afterTest() {
-               if (this.ioManager != null) {
-                       this.ioManager.shutdown();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
-                       this.ioManager = null;
-               }
-               
-               if (this.memoryManager != null) {
-                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
-                               this.memoryManager.verifyEmpty());
-                       this.memoryManager.shutdown();
-                       this.memoryManager = null;
-               }
-       }
-
-
-       @Test
-       public void testBuildFirst() {
-               try {
-                       TestData.TupleGenerator generator1 = new 
TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
-                       TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
-                       
-                       final TestData.TupleGeneratorIterator input1 = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       // collect expected data
-                       final Map<Integer, Collection<TupleMatch>> 
expectedMatchesMap = matchSecondTupleFields(
-                               collectTupleData(input1),
-                               collectTupleData(input2));
-                       
-                       final FlatJoinFunction matcher = new 
TupleMatchRemovingJoin(expectedMatchesMap);
-                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
-       
-                       // reset the generators
-                       generator1.reset();
-                       generator2.reset();
-                       input1.reset();
-                       input2.reset();
-       
-                       // compare with iterator values
-                       ReusingBuildFirstHashMatchIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-                                       new 
ReusingBuildFirstHashMatchIterator<>(
-                                               input1, input2, 
this.recordSerializer, this.record1Comparator, 
-                                               this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
-                                               this.memoryManager, ioManager, 
this.parentTask, 1.0, true);
-                       
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<Integer, Collection<TupleMatch>> entry : 
expectedMatchesMap.entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBuildFirstWithHighNumberOfCommonKeys()
-       {
-               // the size of the left and right inputs
-               final int INPUT_1_SIZE = 200;
-               final int INPUT_2_SIZE = 100;
-               
-               final int INPUT_1_DUPLICATES = 10;
-               final int INPUT_2_DUPLICATES = 2000;
-               final int DUPLICATE_KEY = 13;
-               
-               try {
-                       TestData.TupleGenerator generator1 = new 
TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
-                       TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
-                       
-                       final TestData.TupleGeneratorIterator gen1Iter = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.TupleGeneratorIterator gen2Iter = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       final TestData.TupleConstantValueIterator const1Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for 
Duplicate Keys", INPUT_1_DUPLICATES);
-                       final TestData.TupleConstantValueIterator const2Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for 
Duplicate Keys", INPUT_2_DUPLICATES);
-                       
-                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList1 = new ArrayList<>();
-                       inList1.add(gen1Iter);
-                       inList1.add(const1Iter);
-                       
-                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList2 = new ArrayList<>();
-                       inList2.add(gen2Iter);
-                       inList2.add(const2Iter);
-                       
-                       MutableObjectIterator<Tuple2<Integer, String>> input1 = 
new UnionIterator<>(inList1);
-                       MutableObjectIterator<Tuple2<Integer, String>> input2 = 
new UnionIterator<>(inList2);
-                       
-                       
-                       // collect expected data
-                       final Map<Integer, Collection<TupleMatch>> 
expectedMatchesMap = matchSecondTupleFields(
-                               collectTupleData(input1),
-                               collectTupleData(input2));
-                       
-                       // re-create the whole thing for actual processing
-                       
-                       // reset the generators and iterators
-                       generator1.reset();
-                       generator2.reset();
-                       const1Iter.reset();
-                       const2Iter.reset();
-                       gen1Iter.reset();
-                       gen2Iter.reset();
-                       
-                       inList1.clear();
-                       inList1.add(gen1Iter);
-                       inList1.add(const1Iter);
-                       
-                       inList2.clear();
-                       inList2.add(gen2Iter);
-                       inList2.add(const2Iter);
-       
-                       input1 = new UnionIterator<Tuple2<Integer, 
String>>(inList1);
-                       input2 = new UnionIterator<Tuple2<Integer, 
String>>(inList2);
-                       
-                       final FlatJoinFunction matcher = new 
TupleMatchRemovingJoin(expectedMatchesMap);
-                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
-       
-                       ReusingBuildFirstHashMatchIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-                                       new 
ReusingBuildFirstHashMatchIterator<>(
-                                               input1, input2, 
this.recordSerializer, this.record1Comparator, 
-                                               this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
-                                               this.memoryManager, ioManager, 
this.parentTask, 1.0, true);
-
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<Integer, Collection<TupleMatch>> entry : 
expectedMatchesMap.entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBuildSecond() {
-               try {
-                       TestData.TupleGenerator generator1 = new 
TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
-                       TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
-                       
-                       final TestData.TupleGeneratorIterator input1 = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       // collect expected data
-                       final Map<Integer, Collection<TupleMatch>> 
expectedMatchesMap = matchSecondTupleFields(
-                               collectTupleData(input1),
-                               collectTupleData(input2));
-                       
-                       final FlatJoinFunction matcher = new 
TupleMatchRemovingJoin(expectedMatchesMap);
-                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
-       
-                       // reset the generators
-                       generator1.reset();
-                       generator2.reset();
-                       input1.reset();
-                       input2.reset();
-       
-                       // compare with iterator values                 
-                       ReusingBuildSecondHashMatchIterator<Tuple2<Integer, 
String>,Tuple2<Integer, String> ,Tuple2<Integer, String> > iterator =
-                               new ReusingBuildSecondHashMatchIterator<>(
-                                       input1, input2, this.recordSerializer, 
this.record1Comparator, 
-                                       this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
-                                       this.memoryManager, ioManager, 
this.parentTask, 1.0, true);
-
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<Integer, Collection<TupleMatch>> entry : 
expectedMatchesMap.entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBuildSecondWithHighNumberOfCommonKeys()
-       {
-               // the size of the left and right inputs
-               final int INPUT_1_SIZE = 200;
-               final int INPUT_2_SIZE = 100;
-               
-               final int INPUT_1_DUPLICATES = 10;
-               final int INPUT_2_DUPLICATES = 2000;
-               final int DUPLICATE_KEY = 13;
-               
-               try {
-                       TestData.TupleGenerator generator1 = new 
TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
-                       TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
-                       
-                       final TestData.TupleGeneratorIterator gen1Iter = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.TupleGeneratorIterator gen2Iter = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       final TestData.TupleConstantValueIterator const1Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for 
Duplicate Keys", INPUT_1_DUPLICATES);
-                       final TestData.TupleConstantValueIterator const2Iter = 
new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for 
Duplicate Keys", INPUT_2_DUPLICATES);
-                       
-                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList1 = new ArrayList<>();
-                       inList1.add(gen1Iter);
-                       inList1.add(const1Iter);
-                       
-                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList2 = new ArrayList<>();
-                       inList2.add(gen2Iter);
-                       inList2.add(const2Iter);
-                       
-                       MutableObjectIterator<Tuple2<Integer, String>> input1 = 
new UnionIterator<>(inList1);
-                       MutableObjectIterator<Tuple2<Integer, String>> input2 = 
new UnionIterator<>(inList2);
-                       
-                       
-                       // collect expected data
-                       final Map<Integer, Collection<TupleMatch>> 
expectedMatchesMap = matchSecondTupleFields(
-                               collectTupleData(input1),
-                               collectTupleData(input2));
-                       
-                       // re-create the whole thing for actual processing
-                       
-                       // reset the generators and iterators
-                       generator1.reset();
-                       generator2.reset();
-                       const1Iter.reset();
-                       const2Iter.reset();
-                       gen1Iter.reset();
-                       gen2Iter.reset();
-                       
-                       inList1.clear();
-                       inList1.add(gen1Iter);
-                       inList1.add(const1Iter);
-                       
-                       inList2.clear();
-                       inList2.add(gen2Iter);
-                       inList2.add(const2Iter);
-       
-                       input1 = new UnionIterator<>(inList1);
-                       input2 = new UnionIterator<>(inList2);
-                       
-                       final FlatJoinFunction matcher = new 
TupleMatchRemovingJoin(expectedMatchesMap);
-                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
-       
-                       ReusingBuildSecondHashMatchIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-                               new ReusingBuildSecondHashMatchIterator<>(
-                                       input1, input2, this.recordSerializer, 
this.record1Comparator, 
-                                       this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
-                                       this.memoryManager, ioManager, 
this.parentTask, 1.0, true);
-                       
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<Integer, Collection<TupleMatch>> entry : 
expectedMatchesMap.entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBuildFirstWithMixedDataTypes() {
-               try {
-                       MutableObjectIterator<IntPair> input1 = new 
UniformIntPairGenerator(500, 40, false);
-                       
-                       final TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
-                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       // collect expected data
-                       final Map<Integer, Collection<TupleIntPairMatch>> 
expectedMatchesMap = matchTupleIntPairValues(
-                               collectIntPairData(input1),
-                               collectTupleData(input2));
-                       
-                       final FlatJoinFunction<IntPair, Tuple2<Integer, 
String>, Tuple2<Integer, String>> matcher = new 
TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
-                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
-       
-                       // reset the generators
-                       input1 = new UniformIntPairGenerator(500, 40, false);
-                       generator2.reset();
-                       input2.reset();
-       
-                       // compare with iterator values
-                       ReusingBuildSecondHashMatchIterator<IntPair, 
Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-                                       new 
ReusingBuildSecondHashMatchIterator<>(
-                                               input1, input2, 
this.pairSerializer, this.pairComparator,
-                                               this.recordSerializer, 
this.record2Comparator, this.pairRecordPairComparator,
-                                               this.memoryManager, 
this.ioManager, this.parentTask, 1.0, true);
-                       
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<Integer, Collection<TupleIntPairMatch>> 
entry : expectedMatchesMap.entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testBuildSecondWithMixedDataTypes() {
-               try {
-                       MutableObjectIterator<IntPair> input1 = new 
UniformIntPairGenerator(500, 40, false);
-                       
-                       final TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
-                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-                       
-                       // collect expected data
-                       final Map<Integer, Collection<TupleIntPairMatch>> 
expectedMatchesMap = matchTupleIntPairValues(
-                               collectIntPairData(input1),
-                               collectTupleData(input2));
-                       
-                       final FlatJoinFunction<IntPair, Tuple2<Integer, 
String>, Tuple2<Integer, String>> matcher = new 
TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
-                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
-       
-                       // reset the generators
-                       input1 = new UniformIntPairGenerator(500, 40, false);
-                       generator2.reset();
-                       input2.reset();
-       
-                       // compare with iterator values
-                       ReusingBuildFirstHashMatchIterator<IntPair, 
Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-                                       new 
ReusingBuildFirstHashMatchIterator<>(
-                                               input1, input2, 
this.pairSerializer, this.pairComparator, 
-                                               this.recordSerializer, 
this.record2Comparator, this.recordPairPairComparator,
-                                               this.memoryManager, 
this.ioManager, this.parentTask, 1.0, true);
-                       
-                       iterator.open();
-                       
-                       while (iterator.callWithNextKey(matcher, collector));
-                       
-                       iterator.close();
-       
-                       // assert that each expected match was seen
-                       for (Entry<Integer, Collection<TupleIntPairMatch>> 
entry : expectedMatchesMap.entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                                    Utilities
-       // 
--------------------------------------------------------------------------------------------
-
-       
-       
-       static Map<Integer, Collection<TupleMatch>> matchSecondTupleFields(
-                       Map<Integer, Collection<String>> leftMap,
-                       Map<Integer, Collection<String>> rightMap)
-       {
-               Map<Integer, Collection<TupleMatch>> map = new HashMap<>();
-
-               for (Integer key : leftMap.keySet()) {
-                       Collection<String> leftValues = leftMap.get(key);
-                       Collection<String> rightValues = rightMap.get(key);
-
-                       if (rightValues == null) {
-                               continue;
-                       }
-
-                       if (!map.containsKey(key)) {
-                               map.put(key, new ArrayList<TupleMatch>());
-                       }
-
-                       Collection<TupleMatch> matchedValues = map.get(key);
-
-                       for (String leftValue : leftValues) {
-                               for (String rightValue : rightValues) {
-                                       matchedValues.add(new 
TupleMatch(leftValue, rightValue));
-                               }
-                       }
-               }
-
-               return map;
-       }
-       
-       static Map<Integer, Collection<TupleIntPairMatch>> 
matchTupleIntPairValues(
-               Map<Integer, Collection<Integer>> leftMap,
-               Map<Integer, Collection<String>> rightMap)
-       {
-               final Map<Integer, Collection<TupleIntPairMatch>> map = new 
HashMap<>();
-       
-               for (Integer i : leftMap.keySet()) {
-                       
-                       final Integer key = new Integer(i.intValue());
-                       
-                       final Collection<Integer> leftValues = leftMap.get(i);
-                       final Collection<String> rightValues = 
rightMap.get(key);
-       
-                       if (rightValues == null) {
-                               continue;
-                       }
-       
-                       if (!map.containsKey(key)) {
-                               map.put(key, new 
ArrayList<TupleIntPairMatch>());
-                       }
-       
-                       final Collection<TupleIntPairMatch> matchedValues = 
map.get(key);
-       
-                       for (Integer v : leftValues) {
-                               for (String val : rightValues) {
-                                       matchedValues.add(new 
TupleIntPairMatch(v, val));
-                               }
-                       }
-               }
-       
-               return map;
-       }
-
-       
-       static Map<Integer, Collection<String>> 
collectTupleData(MutableObjectIterator<Tuple2<Integer, String>> iter)
-       throws Exception
-       {
-               Map<Integer, Collection<String>> map = new HashMap<>();
-               Tuple2<Integer, String> pair = new Tuple2<>();
-               
-               while ((pair = iter.next(pair)) != null) {
-
-                       Integer key = pair.f0;
-                       if (!map.containsKey(key)) {
-                               map.put(key, new ArrayList<String>());
-                       }
-
-                       Collection<String> values = map.get(key);
-                       values.add(pair.f1);
-               }
-
-               return map;
-       }
-       
-       static Map<Integer, Collection<Integer>> 
collectIntPairData(MutableObjectIterator<IntPair> iter)
-       throws Exception
-       {
-               Map<Integer, Collection<Integer>> map = new HashMap<>();
-               IntPair pair = new IntPair();
-               
-               while ((pair = iter.next(pair)) != null) {
-
-                       final int key = pair.getKey();
-                       final int value = pair.getValue();
-                       if (!map.containsKey(key)) {
-                               map.put(key, new ArrayList<Integer>());
-                       }
-
-                       Collection<Integer> values = map.get(key);
-                       values.add(value);
-               }
-
-               return map;
-       }
-
-       /**
-        * Private class used for storage of the expected matches in a hash-map.
-        */
-       static class TupleMatch {
-               
-               private final String left;
-               private final String right;
-
-               public TupleMatch(String left, String right) {
-                       this.left = left;
-                       this.right = right;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       TupleMatch o = (TupleMatch) obj;
-                       return this.left.equals(o.left) && 
this.right.equals(o.right);
-               }
-               
-               @Override
-               public int hashCode() {
-                       return this.left.hashCode() ^ this.right.hashCode();
-               }
-
-               @Override
-               public String toString() {
-                       return left + ", " + right;
-               }
-       }
-       
-       /**
-        * Private class used for storage of the expected matches in a hash-map.
-        */
-       static class TupleIntPairMatch
-       {
-               private final int left;
-               private final String right;
-
-               public TupleIntPairMatch(int left, String right) {
-                       this.left = left;
-                       this.right = right;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       TupleIntPairMatch o = (TupleIntPairMatch) obj;
-                       return this.left == o.left && 
this.right.equals(o.right);
-               }
-               
-               @Override
-               public int hashCode() {
-                       return this.left ^ this.right.hashCode();
-               }
-
-               @Override
-               public String toString() {
-                       return left + ", " + right;
-               }
-       }
-       
-       static final class TupleMatchRemovingJoin implements 
FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, 
Tuple2<Integer, String>>
-       {
-               private final Map<Integer, Collection<TupleMatch>> toRemoveFrom;
-               
-               protected TupleMatchRemovingJoin(Map<Integer, 
Collection<TupleMatch>> map) {
-                       this.toRemoveFrom = map;
-               }
-               
-               @Override
-               public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, 
String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
-               {
-                       Integer key = rec1.f0;
-                       String value1 = rec1.f1;
-                       String value2 = rec2.f1;
-                       //System.err.println("rec1 key = "+key+"  rec2 key= 
"+rec2.getField(0, TestData.Key.class));
-                       Collection<TupleMatch> matches = 
this.toRemoveFrom.get(key);
-                       if (matches == null) {
-                               Assert.fail("Match " + key + " - " + value1 + 
":" + value2 + " is unexpected.");
-                       }
-                       
-                       Assert.assertTrue("Produced match was not contained: " 
+ key + " - " + value1 + ":" + value2,
-                               matches.remove(new TupleMatch(value1, value2)));
-                       
-                       if (matches.isEmpty()) {
-                               this.toRemoveFrom.remove(key);
-                       }
-               }
-       }
-       
-       static final class TupleIntPairMatchRemovingMatcher extends 
AbstractRichFunction implements FlatJoinFunction<IntPair, Tuple2<Integer, 
String>, Tuple2<Integer, String>>
-       {
-               private final Map<Integer, Collection<TupleIntPairMatch>> 
toRemoveFrom;
-               
-               protected TupleIntPairMatchRemovingMatcher(Map<Integer, 
Collection<TupleIntPairMatch>> map) {
-                       this.toRemoveFrom = map;
-               }
-               
-               @Override
-               public void join(IntPair rec1, Tuple2<Integer, String> rec2, 
Collector<Tuple2<Integer, String>> out) throws Exception
-               {
-                       final int k = rec1.getKey();
-                       final int v = rec1.getValue(); 
-                       
-                       final Integer key = rec2.f0;
-                       final String value = rec2.f1;
-                       
-                       Assert.assertTrue("Key does not match for matching 
IntPair Tuple combination.", k == key); 
-                       
-                       Collection<TupleIntPairMatch> matches = 
this.toRemoveFrom.get(key);
-                       if (matches == null) {
-                               Assert.fail("Match " + key + " - " + v + ":" + 
value + " is unexpected.");
-                       }
-                       
-                       Assert.assertTrue("Produced match was not contained: " 
+ key + " - " + v + ":" + value,
-                               matches.remove(new TupleIntPairMatch(v, 
value)));
-                       
-                       if (matches.isEmpty()) {
-                               this.toRemoveFrom.remove(key);
-                       }
-               }
-       }
-       
-       static final class IntPairTuplePairComparator extends 
TypePairComparator<IntPair, Tuple2<Integer, String>>
-       {
-               private int reference;
-               
-               @Override
-               public void setReference(IntPair reference) {
-                       this.reference = reference.getKey();    
-               }
-
-               @Override
-               public boolean equalToReference(Tuple2<Integer, String> 
candidate) {
-                       try {
-                               return candidate.f0 == this.reference;
-                       } catch (NullPointerException npex) {
-                               throw new NullKeyFieldException();
-                       }
-               }
-
-               @Override
-               public int compareToReference(Tuple2<Integer, String> 
candidate) {
-                       try {
-                               return candidate.f0 - this.reference;
-                       } catch (NullPointerException npex) {
-                               throw new NullKeyFieldException();
-                       }
-               }
-       }
-       
-       static final class TupleIntPairPairComparator extends 
TypePairComparator<Tuple2<Integer, String>, IntPair>
-       {
-               private int reference;
-               
-               @Override
-               public void setReference(Tuple2<Integer, String> reference) {
-                       this.reference = reference.f0;
-               }
-
-               @Override
-               public boolean equalToReference(IntPair candidate) {
-                       return this.reference == candidate.getKey();
-               }
-
-               @Override
-               public int compareToReference(IntPair candidate) {
-                       return candidate.getKey() - this.reference;
-               }
-       }
-}

Reply via email to