http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
new file mode 100644
index 0000000..5012d1e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import 
org.apache.flink.runtime.operators.hash.HashTableITCase.ConstantsKeyValuePairsIterator;
+import 
org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
+import 
org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.RecordMatch;
+import 
org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase
+               .RecordMatchRemovingJoin;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
+import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.Key;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.runtime.operators.testutils.UnionIterator;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Test specialized hash join that keeps the build side data (in memory and on 
hard disk)
+ * This is used for iterative tasks.
+ */
+@SuppressWarnings("deprecation")
+public class NonReusingReOpenableHashTableITCase {
+
+       private static final int PAGE_SIZE = 8 * 1024;
+       private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
+
+       private static final long SEED1 = 561349061987311L;
+       private static final long SEED2 = 231434613412342L;
+
+       private static final int NUM_PROBES = 3; // number of reopenings of 
hash join
+
+       private final AbstractInvokable parentTask = new DummyInvokable();
+
+       private IOManager ioManager;
+       private MemoryManager memoryManager;
+
+       private TypeSerializer<Record> recordSerializer;
+       private TypeComparator<Record> record1Comparator;
+       private TypeComparator<Record> record2Comparator;
+       private TypePairComparator<Record, Record> recordPairComparator;
+
+
+
+
+       private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
+       private TypeSerializer<Record> recordBuildSideAccesssor;
+       private TypeSerializer<Record> recordProbeSideAccesssor;
+       private TypeComparator<Record> recordBuildSideComparator;
+       private TypeComparator<Record> recordProbeSideComparator;
+       private TypePairComparator<Record, Record> pactRecordComparator;
+
+       @SuppressWarnings("unchecked")
+       @Before
+       public void beforeTest()
+       {
+               this.recordSerializer = RecordSerializer.get();
+
+               this.record1Comparator = new RecordComparator(new int[] {0}, 
new Class[] {Key.class});
+               this.record2Comparator = new RecordComparator(new int[] {0}, 
new Class[] {Key.class});
+               this.recordPairComparator = new RecordPairComparator(new int[] 
{0}, new int[] {0}, new Class[] {Key.class});
+
+
+               final int[] keyPos = new int[] {0};
+               final Class<? extends Key>[] keyType = (Class<? extends Key>[]) 
new Class[] { IntValue.class };
+
+               this.recordBuildSideAccesssor = RecordSerializer.get();
+               this.recordProbeSideAccesssor = RecordSerializer.get();
+               this.recordBuildSideComparator = new RecordComparator(keyPos, 
keyType);
+               this.recordProbeSideComparator = new RecordComparator(keyPos, 
keyType);
+               this.pactRecordComparator = new 
HashTableITCase.RecordPairComparatorFirstInt();
+
+               this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, 
PAGE_SIZE);
+               this.ioManager = new IOManagerAsync();
+       }
+
+       @After
+       public void afterTest()
+       {
+               if (this.ioManager != null) {
+                       this.ioManager.shutdown();
+                       if (!this.ioManager.isProperlyShutDown()) {
+                               Assert.fail("I/O manager failed to properly 
shut down.");
+                       }
+                       this.ioManager = null;
+               }
+
+               if (this.memoryManager != null) {
+                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
+                               this.memoryManager.verifyEmpty());
+                       this.memoryManager.shutdown();
+                       this.memoryManager = null;
+               }
+       }
+
+
+       /**
+        * Test behavior with overflow buckets (Overflow buckets must be 
initialized correctly
+        * if the input is reopened again)
+        */
+       @Test
+       public void testOverflow() {
+
+               int buildSize = 1000;
+               int probeSize = 1000;
+               try {
+                       Generator bgen = new Generator(SEED1, 200, 1024, 
KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+                       Generator pgen = new Generator(SEED2, 0, 1024, 
KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+                       final TestData.GeneratorIterator buildInput = new 
TestData.GeneratorIterator(bgen, buildSize);
+                       final TestData.GeneratorIterator probeInput = new 
TestData.GeneratorIterator(pgen, probeSize);
+                       doTest(buildInput,probeInput, bgen, pgen);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+
+       /**
+        * Verify proper operation if the build side is spilled to disk.
+        */
+       @Test
+       public void testDoubleProbeSpilling() {
+
+               int buildSize = 1000;
+               int probeSize = 1000;
+               try {
+                       Generator bgen = new Generator(SEED1, 0, 1024, 
KeyMode.SORTED, ValueMode.FIX_LENGTH);
+                       Generator pgen = new Generator(SEED2, 0, 1024, 
KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+                       final TestData.GeneratorIterator buildInput = new 
TestData.GeneratorIterator(bgen, buildSize);
+                       final TestData.GeneratorIterator probeInput = new 
TestData.GeneratorIterator(pgen, probeSize);
+                       doTest(buildInput,probeInput, bgen, pgen);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+
+       /**
+        * This test case verifies that hybrid hash join is able to handle 
multiple probe phases
+        * when the build side fits completely into memory.
+        */
+       @Test
+       public void testDoubleProbeInMemory() {
+
+               int buildSize = 1000;
+               int probeSize = 1000;
+               try {
+                       Generator bgen = new Generator(SEED1, 0, 28, 
KeyMode.SORTED, ValueMode.FIX_LENGTH);
+                       Generator pgen = new Generator(SEED2, 0, 28, 
KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+                       final TestData.GeneratorIterator buildInput = new 
TestData.GeneratorIterator(bgen, buildSize);
+                       final TestData.GeneratorIterator probeInput = new 
TestData.GeneratorIterator(pgen, probeSize);
+
+                       doTest(buildInput,probeInput, bgen, pgen);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+
+       private void doTest(TestData.GeneratorIterator buildInput, 
TestData.GeneratorIterator probeInput, Generator bgen, Generator pgen) throws 
Exception {
+               // collect expected data
+               final Map<Key, Collection<RecordMatch>> expectedFirstMatchesMap 
= 
NonReusingHashMatchIteratorITCase.matchRecordValues(NonReusingHashMatchIteratorITCase.collectRecordData(buildInput),
 NonReusingHashMatchIteratorITCase.collectRecordData(probeInput));
+
+               final List<Map<Key, Collection<RecordMatch>>> 
expectedNMatchesMapList = new 
ArrayList<Map<Key,Collection<RecordMatch>>>(NUM_PROBES);
+               final JoinFunction[] nMatcher = new 
RecordMatchRemovingJoin[NUM_PROBES];
+               for(int i = 0; i < NUM_PROBES; i++) {
+                       Map<Key, Collection<RecordMatch>> tmp;
+                       expectedNMatchesMapList.add(tmp = 
deepCopy(expectedFirstMatchesMap));
+                       nMatcher[i] = new RecordMatchRemovingJoin(tmp);
+               }
+
+               final JoinFunction firstMatcher = new 
RecordMatchRemovingJoin(expectedFirstMatchesMap);
+
+               final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
+
+               // reset the generators
+               bgen.reset();
+               pgen.reset();
+               buildInput.reset();
+               probeInput.reset();
+
+               // compare with iterator values
+               NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, 
Record> iterator =
+                               new 
NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
+                                               buildInput, probeInput, 
this.recordSerializer, this.record1Comparator,
+                                       this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
+                                       this.memoryManager, ioManager, 
this.parentTask, 1.0);
+
+               iterator.open();
+               // do first join with both inputs
+               while (iterator.callWithNextKey(firstMatcher, collector));
+
+               // assert that each expected match was seen for the first input
+               for (Entry<Key, Collection<RecordMatch>> entry : 
expectedFirstMatchesMap.entrySet()) {
+                       if (!entry.getValue().isEmpty()) {
+                               Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                       }
+               }
+
+               for(int i = 0; i < NUM_PROBES; i++) {
+                       pgen.reset();
+                       probeInput.reset();
+                       // prepare ..
+                       iterator.reopenProbe(probeInput);
+                       // .. and do second join
+                       while (iterator.callWithNextKey(nMatcher[i], 
collector));
+
+                       // assert that each expected match was seen for the 
second input
+                       for (Entry<Key, Collection<RecordMatch>> entry : 
expectedNMatchesMapList.get(i).entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+
+               iterator.close();
+       }
+
+       //
+       //
+       //      Tests taken from HahTableITCase!
+       //
+       //
+
+       private final MutableObjectIterator<Record> getProbeInput(final int 
numKeys,
+                       final int probeValsPerKey, final int repeatedValue1, 
final int repeatedValue2) {
+               MutableObjectIterator<Record> probe1 = new 
UniformRecordGenerator(numKeys, probeValsPerKey, true);
+               MutableObjectIterator<Record> probe2 = new 
ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
+               MutableObjectIterator<Record> probe3 = new 
ConstantsKeyValuePairsIterator(repeatedValue2, 23, 5);
+               List<MutableObjectIterator<Record>> probes = new 
ArrayList<MutableObjectIterator<Record>>();
+               probes.add(probe1);
+               probes.add(probe2);
+               probes.add(probe3);
+               return new UnionIterator<Record>(probes);
+       }
+
+       @Test
+       public void testSpillingHashJoinWithMassiveCollisions() throws 
IOException
+       {
+               // the following two values are known to have a hash-code 
collision on the initial level.
+               // we use them to make sure one partition grows 
over-proportionally large
+               final int REPEATED_VALUE_1 = 40559;
+               final int REPEATED_VALUE_2 = 92882;
+               final int REPEATED_VALUE_COUNT_BUILD = 200000;
+               final int REPEATED_VALUE_COUNT_PROBE = 5;
+
+               final int NUM_KEYS = 1000000;
+               final int BUILD_VALS_PER_KEY = 3;
+               final int PROBE_VALS_PER_KEY = 10;
+
+               // create a build input that gives 3 million pairs with 3 
values sharing the same key, plus 400k pairs with two colliding keys
+               MutableObjectIterator<Record> build1 = new 
UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+               MutableObjectIterator<Record> build2 = new 
ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, 
REPEATED_VALUE_COUNT_BUILD);
+               MutableObjectIterator<Record> build3 = new 
ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, 
REPEATED_VALUE_COUNT_BUILD);
+               List<MutableObjectIterator<Record>> builds = new 
ArrayList<MutableObjectIterator<Record>>();
+               builds.add(build1);
+               builds.add(build2);
+               builds.add(build3);
+               MutableObjectIterator<Record> buildInput = new 
UnionIterator<Record>(builds);
+
+
+
+
+               // allocate the memory for the HashTable
+               List<MemorySegment> memSegments;
+               try {
+                       memSegments = 
this.memoryManager.allocatePages(MEM_OWNER, 896);
+               }
+               catch (MemoryAllocationException maex) {
+                       fail("Memory for the Join could not be provided.");
+                       return;
+               }
+
+               // create the map for validating the results
+               HashMap<Integer, Long> map = new HashMap<Integer, 
Long>(NUM_KEYS);
+
+               // 
----------------------------------------------------------------------------------------
+
+               final ReOpenableMutableHashTable<Record, Record> join = new 
ReOpenableMutableHashTable<Record, Record>(
+                               this.recordBuildSideAccesssor, 
this.recordProbeSideAccesssor,
+                               this.recordBuildSideComparator, 
this.recordProbeSideComparator, this.pactRecordComparator,
+                               memSegments, ioManager);
+
+               for(int probe = 0; probe < NUM_PROBES; probe++) {
+                       // create a probe input that gives 10 million pairs 
with 10 values sharing a key
+                       MutableObjectIterator<Record> probeInput = 
getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+                       if(probe == 0) {
+                               join.open(buildInput, probeInput);
+                       } else {
+                               join.reopenProbe(probeInput);
+                       }
+
+                       Record record;
+                       final Record recordReuse = new Record();
+
+                       while (join.nextRecord())
+                       {
+                               int numBuildValues = 0;
+
+                               final Record probeRec = 
join.getCurrentProbeRecord();
+                               int key = probeRec.getField(0, 
IntValue.class).getValue();
+
+                               HashBucketIterator<Record, Record> buildSide = 
join.getBuildSideIterator();
+                               if ((record = buildSide.next(recordReuse)) != 
null) {
+                                       numBuildValues = 1;
+                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.getField(0, 
IntValue.class).getValue());
+                               }
+                               else {
+                                       fail("No build side values found for a 
probe key.");
+                               }
+                               while ((record = buildSide.next(record)) != 
null) {
+                                       numBuildValues++;
+                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.getField(0, 
IntValue.class).getValue());
+                               }
+
+                               Long contained = map.get(key);
+                               if (contained == null) {
+                                       contained = 
Long.valueOf(numBuildValues);
+                               }
+                               else {
+                                       contained = 
Long.valueOf(contained.longValue() + numBuildValues);
+                               }
+
+                               map.put(key, contained);
+                       }
+               }
+
+               join.close();
+
+               Assert.assertEquals("Wrong number of keys", NUM_KEYS, 
map.size());
+               for (Entry<Integer, Long> entry : map.entrySet()) {
+                       long val = entry.getValue();
+                       int key = entry.getKey();
+
+                       if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) 
{
+                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key,
+                                                       (PROBE_VALS_PER_KEY + 
REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) 
* NUM_PROBES, val);
+                       } else {
+                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key,
+                                                       PROBE_VALS_PER_KEY * 
BUILD_VALS_PER_KEY * NUM_PROBES, val);
+                       }
+               }
+
+
+               // 
----------------------------------------------------------------------------------------
+
+               this.memoryManager.release(join.getFreedMemory());
+       }
+
+       /*
+        * This test is basically identical to the 
"testSpillingHashJoinWithMassiveCollisions" test, only that the number
+        * of repeated values (causing bucket collisions) are large enough to 
make sure that their target partition no longer
+        * fits into memory by itself and needs to be repartitioned in the 
recursion again.
+        */
+       @Test
+       public void testSpillingHashJoinWithTwoRecursions() throws IOException
+       {
+               // the following two values are known to have a hash-code 
collision on the first recursion level.
+               // we use them to make sure one partition grows 
over-proportionally large
+               final int REPEATED_VALUE_1 = 40559;
+               final int REPEATED_VALUE_2 = 92882;
+               final int REPEATED_VALUE_COUNT_BUILD = 200000;
+               final int REPEATED_VALUE_COUNT_PROBE = 5;
+
+               final int NUM_KEYS = 1000000;
+               final int BUILD_VALS_PER_KEY = 3;
+               final int PROBE_VALS_PER_KEY = 10;
+
+               // create a build input that gives 3 million pairs with 3 
values sharing the same key, plus 400k pairs with two colliding keys
+               MutableObjectIterator<Record> build1 = new 
UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+               MutableObjectIterator<Record> build2 = new 
ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, 
REPEATED_VALUE_COUNT_BUILD);
+               MutableObjectIterator<Record> build3 = new 
ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, 
REPEATED_VALUE_COUNT_BUILD);
+               List<MutableObjectIterator<Record>> builds = new 
ArrayList<MutableObjectIterator<Record>>();
+               builds.add(build1);
+               builds.add(build2);
+               builds.add(build3);
+               MutableObjectIterator<Record> buildInput = new 
UnionIterator<Record>(builds);
+
+
+               // allocate the memory for the HashTable
+               List<MemorySegment> memSegments;
+               try {
+                       memSegments = 
this.memoryManager.allocatePages(MEM_OWNER, 896);
+               }
+               catch (MemoryAllocationException maex) {
+                       fail("Memory for the Join could not be provided.");
+                       return;
+               }
+
+               // create the map for validating the results
+               HashMap<Integer, Long> map = new HashMap<Integer, 
Long>(NUM_KEYS);
+
+               // 
----------------------------------------------------------------------------------------
+
+               final ReOpenableMutableHashTable<Record, Record> join = new 
ReOpenableMutableHashTable<Record, Record>(
+                               this.recordBuildSideAccesssor, 
this.recordProbeSideAccesssor,
+                               this.recordBuildSideComparator, 
this.recordProbeSideComparator, this.pactRecordComparator,
+                               memSegments, ioManager);
+               for(int probe = 0; probe < NUM_PROBES; probe++) {
+                       // create a probe input that gives 10 million pairs 
with 10 values sharing a key
+                       MutableObjectIterator<Record> probeInput = 
getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+                       if(probe == 0) {
+                               join.open(buildInput, probeInput);
+                       } else {
+                               join.reopenProbe(probeInput);
+                       }
+                       Record record;
+                       final Record recordReuse = new Record();
+
+                       while (join.nextRecord())
+                       {
+                               int numBuildValues = 0;
+
+                               final Record probeRec = 
join.getCurrentProbeRecord();
+                               int key = probeRec.getField(0, 
IntValue.class).getValue();
+
+                               HashBucketIterator<Record, Record> buildSide = 
join.getBuildSideIterator();
+                               if ((record = buildSide.next(recordReuse)) != 
null) {
+                                       numBuildValues = 1;
+                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.getField(0, 
IntValue.class).getValue());
+                               }
+                               else {
+                                       fail("No build side values found for a 
probe key.");
+                               }
+                               while ((record = buildSide.next(recordReuse)) 
!= null) {
+                                       numBuildValues++;
+                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.getField(0, 
IntValue.class).getValue());
+                               }
+
+                               Long contained = map.get(key);
+                               if (contained == null) {
+                                       contained = 
Long.valueOf(numBuildValues);
+                               }
+                               else {
+                                       contained = 
Long.valueOf(contained.longValue() + numBuildValues);
+                               }
+
+                               map.put(key, contained);
+                       }
+               }
+
+               join.close();
+               Assert.assertEquals("Wrong number of keys", NUM_KEYS, 
map.size());
+               for (Entry<Integer, Long> entry : map.entrySet()) {
+                       long val = entry.getValue();
+                       int key = entry.getKey();
+
+                       if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) 
{
+                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key,
+                                                       (PROBE_VALS_PER_KEY + 
REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) 
* NUM_PROBES, val);
+                       } else {
+                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key,
+                                                       PROBE_VALS_PER_KEY * 
BUILD_VALS_PER_KEY * NUM_PROBES, val);
+                       }
+               }
+
+
+               // 
----------------------------------------------------------------------------------------
+
+               this.memoryManager.release(join.getFreedMemory());
+       }
+
+
+       static Map<Key, Collection<RecordMatch>> deepCopy(Map<Key, 
Collection<RecordMatch>> expectedSecondMatchesMap) {
+               Map<Key, Collection<RecordMatch>> copy = new HashMap<Key, 
Collection<RecordMatch>>(expectedSecondMatchesMap.size());
+               for(Entry<Key, Collection<RecordMatch>> entry : 
expectedSecondMatchesMap.entrySet()) {
+                       List<RecordMatch> matches = new 
ArrayList<RecordMatch>(entry.getValue().size());
+                       for(RecordMatch m : entry.getValue()) {
+                               matches.add(m);
+                       }
+                       copy.put(entry.getKey(), matches);
+               }
+               return copy;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
deleted file mode 100644
index 71f1979..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import 
org.apache.flink.runtime.operators.hash.HashMatchIteratorITCase.RecordMatch;
-import 
org.apache.flink.runtime.operators.hash.HashMatchIteratorITCase.RecordMatchRemovingJoin;
-import 
org.apache.flink.runtime.operators.hash.HashTableITCase.ConstantsKeyValuePairsIterator;
-import 
org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test specialized hash join that keeps the build side data (in memory and on 
hard disk)
- * This is used for iterative tasks.
- */
-@SuppressWarnings("deprecation")
-public class ReOpenableHashTableITCase {
-       
-       private static final int PAGE_SIZE = 8 * 1024;
-       private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
-
-       private static final long SEED1 = 561349061987311L;
-       private static final long SEED2 = 231434613412342L;
-       
-       private static final int NUM_PROBES = 3; // number of reopenings of 
hash join
-       
-       private final AbstractInvokable parentTask = new DummyInvokable();
-
-       private IOManager ioManager;
-       private MemoryManager memoryManager;
-       
-       private TypeSerializer<Record> recordSerializer;
-       private TypeComparator<Record> record1Comparator;
-       private TypeComparator<Record> record2Comparator;
-       private TypePairComparator<Record, Record> recordPairComparator;
-       
-       
-       
-       
-       private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
-       private TypeSerializer<Record> recordBuildSideAccesssor;
-       private TypeSerializer<Record> recordProbeSideAccesssor;
-       private TypeComparator<Record> recordBuildSideComparator;
-       private TypeComparator<Record> recordProbeSideComparator;
-       private TypePairComparator<Record, Record> pactRecordComparator;
-
-       @SuppressWarnings("unchecked")
-       @Before
-       public void beforeTest()
-       {
-               this.recordSerializer = RecordSerializer.get();
-               
-               this.record1Comparator = new RecordComparator(new int[] {0}, 
new Class[] {TestData.Key.class});
-               this.record2Comparator = new RecordComparator(new int[] {0}, 
new Class[] {TestData.Key.class});
-               this.recordPairComparator = new RecordPairComparator(new int[] 
{0}, new int[] {0}, new Class[] {TestData.Key.class});
-               
-               
-               final int[] keyPos = new int[] {0};
-               final Class<? extends Key>[] keyType = (Class<? extends Key>[]) 
new Class[] { IntValue.class };
-               
-               this.recordBuildSideAccesssor = RecordSerializer.get();
-               this.recordProbeSideAccesssor = RecordSerializer.get();
-               this.recordBuildSideComparator = new RecordComparator(keyPos, 
keyType);
-               this.recordProbeSideComparator = new RecordComparator(keyPos, 
keyType);
-               this.pactRecordComparator = new 
HashTableITCase.RecordPairComparatorFirstInt();
-               
-               this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, 
PAGE_SIZE);
-               this.ioManager = new IOManagerAsync();
-       }
-
-       @After
-       public void afterTest()
-       {
-               if (this.ioManager != null) {
-                       this.ioManager.shutdown();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
-                       this.ioManager = null;
-               }
-               
-               if (this.memoryManager != null) {
-                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
-                               this.memoryManager.verifyEmpty());
-                       this.memoryManager.shutdown();
-                       this.memoryManager = null;
-               }
-       }
-       
-       
-       /**
-        * Test behavior with overflow buckets (Overflow buckets must be 
initialized correctly 
-        * if the input is reopened again)
-        */
-       @Test
-       public void testOverflow() {
-               
-               int buildSize = 1000;
-               int probeSize = 1000;
-               try {
-                       Generator bgen = new Generator(SEED1, 200, 1024, 
KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-                       Generator pgen = new Generator(SEED2, 0, 1024, 
KeyMode.SORTED, ValueMode.FIX_LENGTH);
-                       
-                       final TestData.GeneratorIterator buildInput = new 
TestData.GeneratorIterator(bgen, buildSize);
-                       final TestData.GeneratorIterator probeInput = new 
TestData.GeneratorIterator(pgen, probeSize);
-                       doTest(buildInput,probeInput, bgen, pgen);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       /**
-        * Verify proper operation if the build side is spilled to disk.
-        */
-       @Test
-       public void testDoubleProbeSpilling() {
-               
-               int buildSize = 1000;
-               int probeSize = 1000;
-               try {
-                       Generator bgen = new Generator(SEED1, 0, 1024, 
KeyMode.SORTED, ValueMode.FIX_LENGTH);
-                       Generator pgen = new Generator(SEED2, 0, 1024, 
KeyMode.SORTED, ValueMode.FIX_LENGTH);
-                       
-                       final TestData.GeneratorIterator buildInput = new 
TestData.GeneratorIterator(bgen, buildSize);
-                       final TestData.GeneratorIterator probeInput = new 
TestData.GeneratorIterator(pgen, probeSize);
-                       doTest(buildInput,probeInput, bgen, pgen);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       /**
-        * This test case verifies that hybrid hash join is able to handle 
multiple probe phases
-        * when the build side fits completely into memory.
-        */
-       @Test
-       public void testDoubleProbeInMemory() {
-               
-               int buildSize = 1000;
-               int probeSize = 1000;
-               try {
-                       Generator bgen = new Generator(SEED1, 0, 28, 
KeyMode.SORTED, ValueMode.FIX_LENGTH);
-                       Generator pgen = new Generator(SEED2, 0, 28, 
KeyMode.SORTED, ValueMode.FIX_LENGTH);
-                       
-                       final TestData.GeneratorIterator buildInput = new 
TestData.GeneratorIterator(bgen, buildSize);
-                       final TestData.GeneratorIterator probeInput = new 
TestData.GeneratorIterator(pgen, probeSize);
-                       
-                       doTest(buildInput,probeInput, bgen, pgen);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       private void doTest(TestData.GeneratorIterator buildInput, 
TestData.GeneratorIterator probeInput, Generator bgen, Generator pgen) throws 
Exception {
-               // collect expected data
-               final Map<TestData.Key, Collection<RecordMatch>> 
expectedFirstMatchesMap = HashMatchIteratorITCase.matchRecordValues(
-                       HashMatchIteratorITCase.collectRecordData(buildInput),
-                       HashMatchIteratorITCase.collectRecordData(probeInput));
-               
-               final List<Map<TestData.Key, Collection<RecordMatch>>> 
expectedNMatchesMapList = new 
ArrayList<Map<Key,Collection<RecordMatch>>>(NUM_PROBES);
-               final JoinFunction[] nMatcher = new 
RecordMatchRemovingJoin[NUM_PROBES];
-               for(int i = 0; i < NUM_PROBES; i++) {
-                       Map<TestData.Key, Collection<RecordMatch>> tmp;
-                       expectedNMatchesMapList.add(tmp = 
deepCopy(expectedFirstMatchesMap));
-                       nMatcher[i] = new RecordMatchRemovingJoin(tmp);
-               }
-               
-               final JoinFunction firstMatcher = new 
RecordMatchRemovingJoin(expectedFirstMatchesMap);
-               
-               final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
-
-               // reset the generators
-               bgen.reset();
-               pgen.reset();
-               buildInput.reset();
-               probeInput.reset();
-
-               // compare with iterator values
-               BuildFirstReOpenableHashMatchIterator<Record, Record, Record> 
iterator = 
-                               new 
BuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
-                                               buildInput, probeInput, 
this.recordSerializer, this.record1Comparator, 
-                                       this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
-                                       this.memoryManager, ioManager, 
this.parentTask, 1.0);
-               
-               iterator.open();
-               // do first join with both inputs
-               while (iterator.callWithNextKey(firstMatcher, collector));
-
-               // assert that each expected match was seen for the first input
-               for (Entry<TestData.Key, Collection<RecordMatch>> entry : 
expectedFirstMatchesMap.entrySet()) {
-                       if (!entry.getValue().isEmpty()) {
-                               Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                       }
-               }
-               
-               for(int i = 0; i < NUM_PROBES; i++) {
-                       pgen.reset();
-                       probeInput.reset();
-                       // prepare ..
-                       iterator.reopenProbe(probeInput);
-                       // .. and do second join
-                       while (iterator.callWithNextKey(nMatcher[i], 
collector));
-                       
-                       // assert that each expected match was seen for the 
second input
-                       for (Entry<TestData.Key, Collection<RecordMatch>> entry 
: expectedNMatchesMapList.get(i).entrySet()) {
-                               if (!entry.getValue().isEmpty()) {
-                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
-                               }
-                       }
-               }
-               
-               iterator.close();
-       }
-       
-       //
-       //
-       //      Tests taken from HahTableITCase!
-       //
-       //
-       
-       private final MutableObjectIterator<Record> getProbeInput(final int 
numKeys,
-                       final int probeValsPerKey, final int repeatedValue1, 
final int repeatedValue2) {
-               MutableObjectIterator<Record> probe1 = new 
UniformRecordGenerator(numKeys, probeValsPerKey, true);
-               MutableObjectIterator<Record> probe2 = new 
ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
-               MutableObjectIterator<Record> probe3 = new 
ConstantsKeyValuePairsIterator(repeatedValue2, 23, 5);
-               List<MutableObjectIterator<Record>> probes = new 
ArrayList<MutableObjectIterator<Record>>();
-               probes.add(probe1);
-               probes.add(probe2);
-               probes.add(probe3);
-               return new UnionIterator<Record>(probes);
-       }
-       
-       @Test
-       public void testSpillingHashJoinWithMassiveCollisions() throws 
IOException
-       {
-               // the following two values are known to have a hash-code 
collision on the initial level.
-               // we use them to make sure one partition grows 
over-proportionally large
-               final int REPEATED_VALUE_1 = 40559;
-               final int REPEATED_VALUE_2 = 92882;
-               final int REPEATED_VALUE_COUNT_BUILD = 200000;
-               final int REPEATED_VALUE_COUNT_PROBE = 5;
-               
-               final int NUM_KEYS = 1000000;
-               final int BUILD_VALS_PER_KEY = 3;
-               final int PROBE_VALS_PER_KEY = 10;
-               
-               // create a build input that gives 3 million pairs with 3 
values sharing the same key, plus 400k pairs with two colliding keys
-               MutableObjectIterator<Record> build1 = new 
UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-               MutableObjectIterator<Record> build2 = new 
ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, 
REPEATED_VALUE_COUNT_BUILD);
-               MutableObjectIterator<Record> build3 = new 
ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, 
REPEATED_VALUE_COUNT_BUILD);
-               List<MutableObjectIterator<Record>> builds = new 
ArrayList<MutableObjectIterator<Record>>();
-               builds.add(build1);
-               builds.add(build2);
-               builds.add(build3);
-               MutableObjectIterator<Record> buildInput = new 
UnionIterator<Record>(builds);
-       
-               
-               
-
-               // allocate the memory for the HashTable
-               List<MemorySegment> memSegments;
-               try {
-                       memSegments = 
this.memoryManager.allocatePages(MEM_OWNER, 896);
-               }
-               catch (MemoryAllocationException maex) {
-                       fail("Memory for the Join could not be provided.");
-                       return;
-               }
-               
-               // create the map for validating the results
-               HashMap<Integer, Long> map = new HashMap<Integer, 
Long>(NUM_KEYS);
-               
-               // 
----------------------------------------------------------------------------------------
-               
-               final ReOpenableMutableHashTable<Record, Record> join = new 
ReOpenableMutableHashTable<Record, Record>(
-                               this.recordBuildSideAccesssor, 
this.recordProbeSideAccesssor, 
-                               this.recordBuildSideComparator, 
this.recordProbeSideComparator, this.pactRecordComparator,
-                               memSegments, ioManager);
-               
-               for(int probe = 0; probe < NUM_PROBES; probe++) {
-                       // create a probe input that gives 10 million pairs 
with 10 values sharing a key
-                       MutableObjectIterator<Record> probeInput = 
getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
-                       if(probe == 0) {
-                               join.open(buildInput, probeInput);
-                       } else {
-                               join.reopenProbe(probeInput);
-                       }
-               
-                       Record record;
-                       final Record recordReuse = new Record();
-
-                       while (join.nextRecord())
-                       {
-                               int numBuildValues = 0;
-               
-                               final Record probeRec = 
join.getCurrentProbeRecord();
-                               int key = probeRec.getField(0, 
IntValue.class).getValue();
-                               
-                               HashBucketIterator<Record, Record> buildSide = 
join.getBuildSideIterator();
-                               if ((record = buildSide.next(recordReuse)) != 
null) {
-                                       numBuildValues = 1;
-                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.getField(0, 
IntValue.class).getValue()); 
-                               }
-                               else {
-                                       fail("No build side values found for a 
probe key.");
-                               }
-                               while ((record = buildSide.next(record)) != 
null) {
-                                       numBuildValues++;
-                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.getField(0, 
IntValue.class).getValue());
-                               }
-                               
-                               Long contained = map.get(key);
-                               if (contained == null) {
-                                       contained = 
Long.valueOf(numBuildValues);
-                               }
-                               else {
-                                       contained = 
Long.valueOf(contained.longValue() + numBuildValues);
-                               }
-                               
-                               map.put(key, contained);
-                       }
-               }
-               
-               join.close();
-               
-               Assert.assertEquals("Wrong number of keys", NUM_KEYS, 
map.size());
-               for (Map.Entry<Integer, Long> entry : map.entrySet()) {
-                       long val = entry.getValue();
-                       int key = entry.getKey();
-       
-                       if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) 
{
-                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key, 
-                                                       (PROBE_VALS_PER_KEY + 
REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) 
* NUM_PROBES, val);
-                       } else {
-                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key, 
-                                                       PROBE_VALS_PER_KEY * 
BUILD_VALS_PER_KEY * NUM_PROBES, val);
-                       }
-               }
-               
-               
-               // 
----------------------------------------------------------------------------------------
-               
-               this.memoryManager.release(join.getFreedMemory());
-       }
-       
-       /*
-        * This test is basically identical to the 
"testSpillingHashJoinWithMassiveCollisions" test, only that the number
-        * of repeated values (causing bucket collisions) are large enough to 
make sure that their target partition no longer
-        * fits into memory by itself and needs to be repartitioned in the 
recursion again.
-        */
-       @Test
-       public void testSpillingHashJoinWithTwoRecursions() throws IOException
-       {
-               // the following two values are known to have a hash-code 
collision on the first recursion level.
-               // we use them to make sure one partition grows 
over-proportionally large
-               final int REPEATED_VALUE_1 = 40559;
-               final int REPEATED_VALUE_2 = 92882;
-               final int REPEATED_VALUE_COUNT_BUILD = 200000;
-               final int REPEATED_VALUE_COUNT_PROBE = 5;
-               
-               final int NUM_KEYS = 1000000;
-               final int BUILD_VALS_PER_KEY = 3;
-               final int PROBE_VALS_PER_KEY = 10;
-               
-               // create a build input that gives 3 million pairs with 3 
values sharing the same key, plus 400k pairs with two colliding keys
-               MutableObjectIterator<Record> build1 = new 
UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-               MutableObjectIterator<Record> build2 = new 
ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, 
REPEATED_VALUE_COUNT_BUILD);
-               MutableObjectIterator<Record> build3 = new 
ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, 
REPEATED_VALUE_COUNT_BUILD);
-               List<MutableObjectIterator<Record>> builds = new 
ArrayList<MutableObjectIterator<Record>>();
-               builds.add(build1);
-               builds.add(build2);
-               builds.add(build3);
-               MutableObjectIterator<Record> buildInput = new 
UnionIterator<Record>(builds);
-       
-
-               // allocate the memory for the HashTable
-               List<MemorySegment> memSegments;
-               try {
-                       memSegments = 
this.memoryManager.allocatePages(MEM_OWNER, 896);
-               }
-               catch (MemoryAllocationException maex) {
-                       fail("Memory for the Join could not be provided.");
-                       return;
-               }
-               
-               // create the map for validating the results
-               HashMap<Integer, Long> map = new HashMap<Integer, 
Long>(NUM_KEYS);
-               
-               // 
----------------------------------------------------------------------------------------
-               
-               final ReOpenableMutableHashTable<Record, Record> join = new 
ReOpenableMutableHashTable<Record, Record>(
-                               this.recordBuildSideAccesssor, 
this.recordProbeSideAccesssor, 
-                               this.recordBuildSideComparator, 
this.recordProbeSideComparator, this.pactRecordComparator,
-                               memSegments, ioManager);
-               for(int probe = 0; probe < NUM_PROBES; probe++) {
-                       // create a probe input that gives 10 million pairs 
with 10 values sharing a key
-                       MutableObjectIterator<Record> probeInput = 
getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
-                       if(probe == 0) {
-                               join.open(buildInput, probeInput);
-                       } else {
-                               join.reopenProbe(probeInput);
-                       }
-                       Record record;
-                       final Record recordReuse = new Record();
-
-                       while (join.nextRecord())
-                       {       
-                               int numBuildValues = 0;
-                               
-                               final Record probeRec = 
join.getCurrentProbeRecord();
-                               int key = probeRec.getField(0, 
IntValue.class).getValue();
-                               
-                               HashBucketIterator<Record, Record> buildSide = 
join.getBuildSideIterator();
-                               if ((record = buildSide.next(recordReuse)) != 
null) {
-                                       numBuildValues = 1;
-                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.getField(0, 
IntValue.class).getValue()); 
-                               }
-                               else {
-                                       fail("No build side values found for a 
probe key.");
-                               }
-                               while ((record = buildSide.next(recordReuse)) 
!= null) {
-                                       numBuildValues++;
-                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.getField(0, 
IntValue.class).getValue());
-                               }
-                               
-                               Long contained = map.get(key);
-                               if (contained == null) {
-                                       contained = 
Long.valueOf(numBuildValues);
-                               }
-                               else {
-                                       contained = 
Long.valueOf(contained.longValue() + numBuildValues);
-                               }
-                               
-                               map.put(key, contained);
-                       }
-               }
-               
-               join.close();
-               Assert.assertEquals("Wrong number of keys", NUM_KEYS, 
map.size());
-               for (Map.Entry<Integer, Long> entry : map.entrySet()) {
-                       long val = entry.getValue();
-                       int key = entry.getKey();
-       
-                       if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) 
{
-                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key, 
-                                                       (PROBE_VALS_PER_KEY + 
REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) 
* NUM_PROBES, val);
-                       } else {
-                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key, 
-                                                       PROBE_VALS_PER_KEY * 
BUILD_VALS_PER_KEY * NUM_PROBES, val);
-                       }
-               }
-               
-               
-               // 
----------------------------------------------------------------------------------------
-               
-               this.memoryManager.release(join.getFreedMemory());
-       }
-       
-       
-       static Map<Key, Collection<RecordMatch>> deepCopy(Map<Key, 
Collection<RecordMatch>> expectedSecondMatchesMap) {
-               Map<Key, Collection<RecordMatch>> copy = new HashMap<Key, 
Collection<RecordMatch>>(expectedSecondMatchesMap.size());
-               for(Map.Entry<Key, Collection<RecordMatch>> entry : 
expectedSecondMatchesMap.entrySet()) {
-                       List<RecordMatch> matches = new 
ArrayList<RecordMatch>(entry.getValue().size());
-                       for(RecordMatch m : entry.getValue()) {
-                               matches.add(m);
-                       }
-                       copy.put(entry.getKey(), matches);
-               }
-               return copy;
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
new file mode 100644
index 0000000..18cd8d0
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
@@ -0,0 +1,778 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
+import org.apache.flink.runtime.operators.testutils.UnionIterator;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
+import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.types.IntPair;
+import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "deprecation"})
+public class ReusingHashMatchIteratorITCase {
+       
+       private static final int MEMORY_SIZE = 16000000;                // 
total memory
+
+       private static final int INPUT_1_SIZE = 20000;
+       private static final int INPUT_2_SIZE = 1000;
+
+       private static final long SEED1 = 561349061987311L;
+       private static final long SEED2 = 231434613412342L;
+       
+       private final AbstractInvokable parentTask = new DummyInvokable();
+
+       private IOManager ioManager;
+       private MemoryManager memoryManager;
+       
+       private TypeSerializer<Record> recordSerializer;
+       private TypeComparator<Record> record1Comparator;
+       private TypeComparator<Record> record2Comparator;
+       private TypePairComparator<Record, Record> recordPairComparator;
+       
+       private TypeSerializer<IntPair> pairSerializer;
+       private TypeComparator<IntPair> pairComparator;
+       private TypePairComparator<IntPair, Record> pairRecordPairComparator;
+       private TypePairComparator<Record, IntPair> recordPairPairComparator;
+
+
+       @SuppressWarnings("unchecked")
+       @Before
+       public void beforeTest() {
+               this.recordSerializer = RecordSerializer.get();
+               
+               this.record1Comparator = new RecordComparator(new int[] {0}, 
new Class[] {TestData.Key.class});
+               this.record2Comparator = new RecordComparator(new int[] {0}, 
new Class[] {TestData.Key.class});
+               
+               this.recordPairComparator = new RecordPairComparator(new int[] 
{0}, new int[] {0}, new Class[] {TestData.Key.class});
+               
+               this.pairSerializer = new IntPairSerializer();
+               this.pairComparator = new IntPairComparator();
+               this.pairRecordPairComparator = new 
IntPairRecordPairComparator();
+               this.recordPairPairComparator = new 
RecordIntPairPairComparator();
+               
+               this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+               this.ioManager = new IOManagerAsync();
+       }
+
+       @After
+       public void afterTest() {
+               if (this.ioManager != null) {
+                       this.ioManager.shutdown();
+                       if (!this.ioManager.isProperlyShutDown()) {
+                               Assert.fail("I/O manager failed to properly 
shut down.");
+                       }
+                       this.ioManager = null;
+               }
+               
+               if (this.memoryManager != null) {
+                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
+                               this.memoryManager.verifyEmpty());
+                       this.memoryManager.shutdown();
+                       this.memoryManager = null;
+               }
+       }
+
+
+       @Test
+       public void testBuildFirst() {
+               try {
+                       Generator generator1 = new Generator(SEED1, 500, 4096, 
KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       Generator generator2 = new Generator(SEED2, 500, 2048, 
KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       
+                       final TestData.GeneratorIterator input1 = new 
TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.GeneratorIterator input2 = new 
TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+                       
+                       // collect expected data
+                       final Map<TestData.Key, Collection<RecordMatch>> 
expectedMatchesMap = matchRecordValues(
+                               collectRecordData(input1),
+                               collectRecordData(input2));
+                       
+                       final JoinFunction matcher = new 
RecordMatchRemovingJoin(expectedMatchesMap);
+                       final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
+       
+                       // reset the generators
+                       generator1.reset();
+                       generator2.reset();
+                       input1.reset();
+                       input2.reset();
+       
+                       // compare with iterator values
+                       ReusingBuildFirstHashMatchIterator<Record, Record, 
Record> iterator =
+                                       new 
ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+                                               input1, input2, 
this.recordSerializer, this.record1Comparator, 
+                                               this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
+                                               this.memoryManager, ioManager, 
this.parentTask, 1.0);
+                       
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(matcher, collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<TestData.Key, Collection<RecordMatch>> entry 
: expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testBuildFirstWithHighNumberOfCommonKeys()
+       {
+               // the size of the left and right inputs
+               final int INPUT_1_SIZE = 200;
+               final int INPUT_2_SIZE = 100;
+               
+               final int INPUT_1_DUPLICATES = 10;
+               final int INPUT_2_DUPLICATES = 2000;
+               final int DUPLICATE_KEY = 13;
+               
+               try {
+                       Generator generator1 = new Generator(SEED1, 500, 4096, 
KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       Generator generator2 = new Generator(SEED2, 500, 2048, 
KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       
+                       final TestData.GeneratorIterator gen1Iter = new 
TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.GeneratorIterator gen2Iter = new 
TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+                       
+                       final TestData.ConstantValueIterator const1Iter = new 
TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", 
INPUT_1_DUPLICATES);
+                       final TestData.ConstantValueIterator const2Iter = new 
TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate 
Keys", INPUT_2_DUPLICATES);
+                       
+                       final List<MutableObjectIterator<Record>> inList1 = new 
ArrayList<MutableObjectIterator<Record>>();
+                       inList1.add(gen1Iter);
+                       inList1.add(const1Iter);
+                       
+                       final List<MutableObjectIterator<Record>> inList2 = new 
ArrayList<MutableObjectIterator<Record>>();
+                       inList2.add(gen2Iter);
+                       inList2.add(const2Iter);
+                       
+                       MutableObjectIterator<Record> input1 = new 
UnionIterator<Record>(inList1);
+                       MutableObjectIterator<Record> input2 = new 
UnionIterator<Record>(inList2);
+                       
+                       
+                       // collect expected data
+                       final Map<TestData.Key, Collection<RecordMatch>> 
expectedMatchesMap = matchRecordValues(
+                               collectRecordData(input1),
+                               collectRecordData(input2));
+                       
+                       // re-create the whole thing for actual processing
+                       
+                       // reset the generators and iterators
+                       generator1.reset();
+                       generator2.reset();
+                       const1Iter.reset();
+                       const2Iter.reset();
+                       gen1Iter.reset();
+                       gen2Iter.reset();
+                       
+                       inList1.clear();
+                       inList1.add(gen1Iter);
+                       inList1.add(const1Iter);
+                       
+                       inList2.clear();
+                       inList2.add(gen2Iter);
+                       inList2.add(const2Iter);
+       
+                       input1 = new UnionIterator<Record>(inList1);
+                       input2 = new UnionIterator<Record>(inList2);
+                       
+                       final JoinFunction matcher = new 
RecordMatchRemovingJoin(expectedMatchesMap);
+                       final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
+       
+                       ReusingBuildFirstHashMatchIterator<Record, Record, 
Record> iterator =
+                                       new 
ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+                                               input1, input2, 
this.recordSerializer, this.record1Comparator, 
+                                               this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
+                                               this.memoryManager, ioManager, 
this.parentTask, 1.0);
+
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(matcher, collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<TestData.Key, Collection<RecordMatch>> entry 
: expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testBuildSecond() {
+               try {
+                       Generator generator1 = new Generator(SEED1, 500, 4096, 
KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       Generator generator2 = new Generator(SEED2, 500, 2048, 
KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       
+                       final TestData.GeneratorIterator input1 = new 
TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.GeneratorIterator input2 = new 
TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+                       
+                       // collect expected data
+                       final Map<TestData.Key, Collection<RecordMatch>> 
expectedMatchesMap = matchRecordValues(
+                               collectRecordData(input1),
+                               collectRecordData(input2));
+                       
+                       final JoinFunction matcher = new 
RecordMatchRemovingJoin(expectedMatchesMap);
+                       final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
+       
+                       // reset the generators
+                       generator1.reset();
+                       generator2.reset();
+                       input1.reset();
+                       input2.reset();
+       
+                       // compare with iterator values                 
+                       ReusingBuildSecondHashMatchIterator<Record, Record, 
Record> iterator =
+                               new ReusingBuildSecondHashMatchIterator<Record, 
Record, Record>(
+                                       input1, input2, this.recordSerializer, 
this.record1Comparator, 
+                                       this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
+                                       this.memoryManager, ioManager, 
this.parentTask, 1.0);
+
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(matcher, collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<TestData.Key, Collection<RecordMatch>> entry 
: expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testBuildSecondWithHighNumberOfCommonKeys()
+       {
+               // the size of the left and right inputs
+               final int INPUT_1_SIZE = 200;
+               final int INPUT_2_SIZE = 100;
+               
+               final int INPUT_1_DUPLICATES = 10;
+               final int INPUT_2_DUPLICATES = 2000;
+               final int DUPLICATE_KEY = 13;
+               
+               try {
+                       Generator generator1 = new Generator(SEED1, 500, 4096, 
KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       Generator generator2 = new Generator(SEED2, 500, 2048, 
KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       
+                       final TestData.GeneratorIterator gen1Iter = new 
TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.GeneratorIterator gen2Iter = new 
TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+                       
+                       final TestData.ConstantValueIterator const1Iter = new 
TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", 
INPUT_1_DUPLICATES);
+                       final TestData.ConstantValueIterator const2Iter = new 
TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate 
Keys", INPUT_2_DUPLICATES);
+                       
+                       final List<MutableObjectIterator<Record>> inList1 = new 
ArrayList<MutableObjectIterator<Record>>();
+                       inList1.add(gen1Iter);
+                       inList1.add(const1Iter);
+                       
+                       final List<MutableObjectIterator<Record>> inList2 = new 
ArrayList<MutableObjectIterator<Record>>();
+                       inList2.add(gen2Iter);
+                       inList2.add(const2Iter);
+                       
+                       MutableObjectIterator<Record> input1 = new 
UnionIterator<Record>(inList1);
+                       MutableObjectIterator<Record> input2 = new 
UnionIterator<Record>(inList2);
+                       
+                       
+                       // collect expected data
+                       final Map<TestData.Key, Collection<RecordMatch>> 
expectedMatchesMap = matchRecordValues(
+                               collectRecordData(input1),
+                               collectRecordData(input2));
+                       
+                       // re-create the whole thing for actual processing
+                       
+                       // reset the generators and iterators
+                       generator1.reset();
+                       generator2.reset();
+                       const1Iter.reset();
+                       const2Iter.reset();
+                       gen1Iter.reset();
+                       gen2Iter.reset();
+                       
+                       inList1.clear();
+                       inList1.add(gen1Iter);
+                       inList1.add(const1Iter);
+                       
+                       inList2.clear();
+                       inList2.add(gen2Iter);
+                       inList2.add(const2Iter);
+       
+                       input1 = new UnionIterator<Record>(inList1);
+                       input2 = new UnionIterator<Record>(inList2);
+                       
+                       final JoinFunction matcher = new 
RecordMatchRemovingJoin(expectedMatchesMap);
+                       final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
+       
+                       ReusingBuildSecondHashMatchIterator<Record, Record, 
Record> iterator =
+                               new ReusingBuildSecondHashMatchIterator<Record, 
Record, Record>(
+                                       input1, input2, this.recordSerializer, 
this.record1Comparator, 
+                                       this.recordSerializer, 
this.record2Comparator, this.recordPairComparator,
+                                       this.memoryManager, ioManager, 
this.parentTask, 1.0);
+                       
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(matcher, collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<TestData.Key, Collection<RecordMatch>> entry 
: expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testBuildFirstWithMixedDataTypes() {
+               try {
+                       MutableObjectIterator<IntPair> input1 = new 
UniformIntPairGenerator(500, 40, false);
+                       
+                       final Generator generator2 = new Generator(SEED2, 500, 
2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       final TestData.GeneratorIterator input2 = new 
TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+                       
+                       // collect expected data
+                       final Map<TestData.Key, Collection<RecordIntPairMatch>> 
expectedMatchesMap = matchRecordIntPairValues(
+                               collectIntPairData(input1),
+                               collectRecordData(input2));
+                       
+                       final FlatJoinFunction<IntPair, Record, Record> matcher 
= new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
+                       final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
+       
+                       // reset the generators
+                       input1 = new UniformIntPairGenerator(500, 40, false);
+                       generator2.reset();
+                       input2.reset();
+       
+                       // compare with iterator values
+                       ReusingBuildSecondHashMatchIterator<IntPair, Record, 
Record> iterator =
+                                       new 
ReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
+                                               input1, input2, 
this.pairSerializer, this.pairComparator,
+                                               this.recordSerializer, 
this.record2Comparator, this.pairRecordPairComparator,
+                                               this.memoryManager, 
this.ioManager, this.parentTask, 1.0);
+                       
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(matcher, collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<TestData.Key, 
Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testBuildSecondWithMixedDataTypes() {
+               try {
+                       MutableObjectIterator<IntPair> input1 = new 
UniformIntPairGenerator(500, 40, false);
+                       
+                       final Generator generator2 = new Generator(SEED2, 500, 
2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       final TestData.GeneratorIterator input2 = new 
TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+                       
+                       // collect expected data
+                       final Map<TestData.Key, Collection<RecordIntPairMatch>> 
expectedMatchesMap = matchRecordIntPairValues(
+                               collectIntPairData(input1),
+                               collectRecordData(input2));
+                       
+                       final FlatJoinFunction<IntPair, Record, Record> matcher 
= new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
+                       final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
+       
+                       // reset the generators
+                       input1 = new UniformIntPairGenerator(500, 40, false);
+                       generator2.reset();
+                       input2.reset();
+       
+                       // compare with iterator values
+                       ReusingBuildFirstHashMatchIterator<IntPair, Record, 
Record> iterator =
+                                       new 
ReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
+                                               input1, input2, 
this.pairSerializer, this.pairComparator, 
+                                               this.recordSerializer, 
this.record2Comparator, this.recordPairPairComparator,
+                                               this.memoryManager, 
this.ioManager, this.parentTask, 1.0);
+                       
+                       iterator.open();
+                       
+                       while (iterator.callWithNextKey(matcher, collector));
+                       
+                       iterator.close();
+       
+                       // assert that each expected match was seen
+                       for (Entry<TestData.Key, 
Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                                    Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       
+       
+       static Map<TestData.Key, Collection<RecordMatch>> matchRecordValues(
+                       Map<TestData.Key, Collection<TestData.Value>> leftMap,
+                       Map<TestData.Key, Collection<TestData.Value>> rightMap)
+       {
+               Map<TestData.Key, Collection<RecordMatch>> map = new 
HashMap<TestData.Key, Collection<RecordMatch>>();
+
+               for (TestData.Key key : leftMap.keySet()) {
+                       Collection<TestData.Value> leftValues = 
leftMap.get(key);
+                       Collection<TestData.Value> rightValues = 
rightMap.get(key);
+
+                       if (rightValues == null) {
+                               continue;
+                       }
+
+                       if (!map.containsKey(key)) {
+                               map.put(key, new ArrayList<RecordMatch>());
+                       }
+
+                       Collection<RecordMatch> matchedValues = map.get(key);
+
+                       for (TestData.Value leftValue : leftValues) {
+                               for (TestData.Value rightValue : rightValues) {
+                                       matchedValues.add(new 
RecordMatch(leftValue, rightValue));
+                               }
+                       }
+               }
+
+               return map;
+       }
+       
+       static Map<TestData.Key, Collection<RecordIntPairMatch>> 
matchRecordIntPairValues(
+               Map<Integer, Collection<Integer>> leftMap,
+               Map<TestData.Key, Collection<TestData.Value>> rightMap)
+       {
+               final Map<TestData.Key, Collection<RecordIntPairMatch>> map = 
new HashMap<TestData.Key, Collection<RecordIntPairMatch>>();
+       
+               for (Integer i : leftMap.keySet()) {
+                       
+                       final TestData.Key key = new TestData.Key(i.intValue());
+                       
+                       final Collection<Integer> leftValues = leftMap.get(i);
+                       final Collection<TestData.Value> rightValues = 
rightMap.get(key);
+       
+                       if (rightValues == null) {
+                               continue;
+                       }
+       
+                       if (!map.containsKey(key)) {
+                               map.put(key, new 
ArrayList<RecordIntPairMatch>());
+                       }
+       
+                       final Collection<RecordIntPairMatch> matchedValues = 
map.get(key);
+       
+                       for (Integer v : leftValues) {
+                               for (TestData.Value val : rightValues) {
+                                       matchedValues.add(new 
RecordIntPairMatch(v, val));
+                               }
+                       }
+               }
+       
+               return map;
+       }
+
+       
+       static Map<TestData.Key, Collection<TestData.Value>> 
collectRecordData(MutableObjectIterator<Record> iter)
+       throws Exception
+       {
+               Map<TestData.Key, Collection<TestData.Value>> map = new 
HashMap<TestData.Key, Collection<TestData.Value>>();
+               Record pair = new Record();
+               
+               while ((pair = iter.next(pair)) != null) {
+
+                       TestData.Key key = pair.getField(0, TestData.Key.class);
+                       if (!map.containsKey(key)) {
+                               map.put(new TestData.Key(key.getKey()), new 
ArrayList<TestData.Value>());
+                       }
+
+                       Collection<TestData.Value> values = map.get(key);
+                       values.add(new TestData.Value(pair.getField(1, 
TestData.Value.class).getValue()));
+               }
+
+               return map;
+       }
+       
+       static Map<Integer, Collection<Integer>> 
collectIntPairData(MutableObjectIterator<IntPair> iter)
+       throws Exception
+       {
+               Map<Integer, Collection<Integer>> map = new HashMap<Integer, 
Collection<Integer>>();
+               IntPair pair = new IntPair();
+               
+               while ((pair = iter.next(pair)) != null) {
+
+                       final int key = pair.getKey();
+                       final int value = pair.getValue();
+                       if (!map.containsKey(key)) {
+                               map.put(key, new ArrayList<Integer>());
+                       }
+
+                       Collection<Integer> values = map.get(key);
+                       values.add(value);
+               }
+
+               return map;
+       }
+
+       /**
+        * Private class used for storage of the expected matches in a hash-map.
+        */
+       static class RecordMatch {
+               
+               private final Value left;
+               private final Value right;
+
+               public RecordMatch(Value left, Value right) {
+                       this.left = left;
+                       this.right = right;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       RecordMatch o = (RecordMatch) obj;
+                       return this.left.equals(o.left) && 
this.right.equals(o.right);
+               }
+               
+               @Override
+               public int hashCode() {
+                       return this.left.hashCode() ^ this.right.hashCode();
+               }
+
+               @Override
+               public String toString() {
+                       return left + ", " + right;
+               }
+       }
+       
+       /**
+        * Private class used for storage of the expected matches in a hash-map.
+        */
+       static class RecordIntPairMatch
+       {
+               private final int left;
+               private final Value right;
+
+               public RecordIntPairMatch(int left, Value right) {
+                       this.left = left;
+                       this.right = right;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       RecordIntPairMatch o = (RecordIntPairMatch) obj;
+                       return this.left == o.left && 
this.right.equals(o.right);
+               }
+               
+               @Override
+               public int hashCode() {
+                       return this.left ^ this.right.hashCode();
+               }
+
+               @Override
+               public String toString() {
+                       return left + ", " + right;
+               }
+       }
+       
+       static final class RecordMatchRemovingJoin extends JoinFunction
+       {
+               private final Map<TestData.Key, Collection<RecordMatch>> 
toRemoveFrom;
+               
+               protected RecordMatchRemovingJoin(Map<TestData.Key, 
Collection<RecordMatch>> map) {
+                       this.toRemoveFrom = map;
+               }
+               
+               @Override
+               public void join(Record rec1, Record rec2, Collector<Record> 
out) throws Exception
+               {
+                       TestData.Key key = rec1.getField(0, TestData.Key.class);
+                       TestData.Value value1 = rec1.getField(1, 
TestData.Value.class);
+                       TestData.Value value2 = rec2.getField(1, 
TestData.Value.class);
+                       //System.err.println("rec1 key = "+key+"  rec2 key= 
"+rec2.getField(0, TestData.Key.class));
+                       Collection<RecordMatch> matches = 
this.toRemoveFrom.get(key);
+                       if (matches == null) {
+                               Assert.fail("Match " + key + " - " + value1 + 
":" + value2 + " is unexpected.");
+                       }
+                       
+                       Assert.assertTrue("Produced match was not contained: " 
+ key + " - " + value1 + ":" + value2,
+                               matches.remove(new RecordMatch(value1, 
value2)));
+                       
+                       if (matches.isEmpty()) {
+                               this.toRemoveFrom.remove(key);
+                       }
+               }
+       }
+       
+       static final class RecordIntPairMatchRemovingMatcher extends 
AbstractRichFunction implements FlatJoinFunction<IntPair, Record, Record>
+       {
+               private final Map<TestData.Key, Collection<RecordIntPairMatch>> 
toRemoveFrom;
+               
+               protected RecordIntPairMatchRemovingMatcher(Map<TestData.Key, 
Collection<RecordIntPairMatch>> map) {
+                       this.toRemoveFrom = map;
+               }
+               
+               @Override
+               public void join(IntPair rec1, Record rec2, Collector<Record> 
out) throws Exception
+               {
+                       final int k = rec1.getKey();
+                       final int v = rec1.getValue(); 
+                       
+                       final TestData.Key key = rec2.getField(0, 
TestData.Key.class);
+                       final TestData.Value value = rec2.getField(1, 
TestData.Value.class);
+                       
+                       Assert.assertTrue("Key does not match for matching 
IntPair Record combination.", k == key.getKey()); 
+                       
+                       Collection<RecordIntPairMatch> matches = 
this.toRemoveFrom.get(key);
+                       if (matches == null) {
+                               Assert.fail("Match " + key + " - " + v + ":" + 
value + " is unexpected.");
+                       }
+                       
+                       Assert.assertTrue("Produced match was not contained: " 
+ key + " - " + v + ":" + value,
+                               matches.remove(new RecordIntPairMatch(v, 
value)));
+                       
+                       if (matches.isEmpty()) {
+                               this.toRemoveFrom.remove(key);
+                       }
+               }
+       }
+       
+       static final class IntPairRecordPairComparator extends 
TypePairComparator<IntPair, Record>
+       {
+               private int reference;
+               
+               @Override
+               public void setReference(IntPair reference) {
+                       this.reference = reference.getKey();    
+               }
+
+               @Override
+               public boolean equalToReference(Record candidate) {
+                       try {
+                               final IntValue i = candidate.getField(0, 
IntValue.class);
+                               return i.getValue() == this.reference;
+                       } catch (NullPointerException npex) {
+                               throw new NullKeyFieldException();
+                       }
+               }
+
+               @Override
+               public int compareToReference(Record candidate) {
+                       try {
+                               final IntValue i = candidate.getField(0, 
IntValue.class);
+                               return i.getValue() - this.reference;
+                       } catch (NullPointerException npex) {
+                               throw new NullKeyFieldException();
+                       }
+               }
+       }
+       
+       static final class RecordIntPairPairComparator extends 
TypePairComparator<Record, IntPair>
+       {
+               private int reference;
+               
+               @Override
+               public void setReference(Record reference) {
+                       this.reference = reference.getField(0, 
IntValue.class).getValue();
+               }
+
+               @Override
+               public boolean equalToReference(IntPair candidate) {
+                       return this.reference == candidate.getKey();
+               }
+
+               @Override
+               public int compareToReference(IntPair candidate) {
+                       return candidate.getKey() - this.reference;
+               }
+       }
+}

Reply via email to