http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java new file mode 100644 index 0000000..5012d1e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.record.RecordComparator; +import org.apache.flink.api.common.typeutils.record.RecordPairComparator; +import org.apache.flink.api.common.typeutils.record.RecordSerializer; +import org.apache.flink.api.java.record.functions.JoinFunction; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.hash.HashTableITCase.ConstantsKeyValuePairsIterator; +import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator; +import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.RecordMatch; +import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase + .RecordMatchRemovingJoin; +import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.runtime.operators.testutils.TestData; +import org.apache.flink.runtime.operators.testutils.TestData.Generator; +import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode; +import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode; +import org.apache.flink.runtime.operators.testutils.TestData.Key; +import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; +import org.apache.flink.runtime.operators.testutils.UnionIterator; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.Record; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import static org.junit.Assert.fail; + +/** + * Test specialized hash join that keeps the build side data (in memory and on hard disk) + * This is used for iterative tasks. + */ +@SuppressWarnings("deprecation") +public class NonReusingReOpenableHashTableITCase { + + private static final int PAGE_SIZE = 8 * 1024; + private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages. + + private static final long SEED1 = 561349061987311L; + private static final long SEED2 = 231434613412342L; + + private static final int NUM_PROBES = 3; // number of reopenings of hash join + + private final AbstractInvokable parentTask = new DummyInvokable(); + + private IOManager ioManager; + private MemoryManager memoryManager; + + private TypeSerializer<Record> recordSerializer; + private TypeComparator<Record> record1Comparator; + private TypeComparator<Record> record2Comparator; + private TypePairComparator<Record, Record> recordPairComparator; + + + + + private static final AbstractInvokable MEM_OWNER = new DummyInvokable(); + private TypeSerializer<Record> recordBuildSideAccesssor; + private TypeSerializer<Record> recordProbeSideAccesssor; + private TypeComparator<Record> recordBuildSideComparator; + private TypeComparator<Record> recordProbeSideComparator; + private TypePairComparator<Record, Record> pactRecordComparator; + + @SuppressWarnings("unchecked") + @Before + public void beforeTest() + { + this.recordSerializer = RecordSerializer.get(); + + this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {Key.class}); + this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {Key.class}); + this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {Key.class}); + + + final int[] keyPos = new int[] {0}; + final Class<? extends Key>[] keyType = (Class<? extends Key>[]) new Class[] { IntValue.class }; + + this.recordBuildSideAccesssor = RecordSerializer.get(); + this.recordProbeSideAccesssor = RecordSerializer.get(); + this.recordBuildSideComparator = new RecordComparator(keyPos, keyType); + this.recordProbeSideComparator = new RecordComparator(keyPos, keyType); + this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt(); + + this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE); + this.ioManager = new IOManagerAsync(); + } + + @After + public void afterTest() + { + if (this.ioManager != null) { + this.ioManager.shutdown(); + if (!this.ioManager.isProperlyShutDown()) { + Assert.fail("I/O manager failed to properly shut down."); + } + this.ioManager = null; + } + + if (this.memoryManager != null) { + Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", + this.memoryManager.verifyEmpty()); + this.memoryManager.shutdown(); + this.memoryManager = null; + } + } + + + /** + * Test behavior with overflow buckets (Overflow buckets must be initialized correctly + * if the input is reopened again) + */ + @Test + public void testOverflow() { + + int buildSize = 1000; + int probeSize = 1000; + try { + Generator bgen = new Generator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH); + Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); + + final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize); + final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize); + doTest(buildInput,probeInput, bgen, pgen); + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + /** + * Verify proper operation if the build side is spilled to disk. + */ + @Test + public void testDoubleProbeSpilling() { + + int buildSize = 1000; + int probeSize = 1000; + try { + Generator bgen = new Generator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); + Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); + + final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize); + final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize); + doTest(buildInput,probeInput, bgen, pgen); + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + /** + * This test case verifies that hybrid hash join is able to handle multiple probe phases + * when the build side fits completely into memory. + */ + @Test + public void testDoubleProbeInMemory() { + + int buildSize = 1000; + int probeSize = 1000; + try { + Generator bgen = new Generator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH); + Generator pgen = new Generator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH); + + final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize); + final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize); + + doTest(buildInput,probeInput, bgen, pgen); + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + private void doTest(TestData.GeneratorIterator buildInput, TestData.GeneratorIterator probeInput, Generator bgen, Generator pgen) throws Exception { + // collect expected data + final Map<Key, Collection<RecordMatch>> expectedFirstMatchesMap = NonReusingHashMatchIteratorITCase.matchRecordValues(NonReusingHashMatchIteratorITCase.collectRecordData(buildInput), NonReusingHashMatchIteratorITCase.collectRecordData(probeInput)); + + final List<Map<Key, Collection<RecordMatch>>> expectedNMatchesMapList = new ArrayList<Map<Key,Collection<RecordMatch>>>(NUM_PROBES); + final JoinFunction[] nMatcher = new RecordMatchRemovingJoin[NUM_PROBES]; + for(int i = 0; i < NUM_PROBES; i++) { + Map<Key, Collection<RecordMatch>> tmp; + expectedNMatchesMapList.add(tmp = deepCopy(expectedFirstMatchesMap)); + nMatcher[i] = new RecordMatchRemovingJoin(tmp); + } + + final JoinFunction firstMatcher = new RecordMatchRemovingJoin(expectedFirstMatchesMap); + + final Collector<Record> collector = new DiscardingOutputCollector<Record>(); + + // reset the generators + bgen.reset(); + pgen.reset(); + buildInput.reset(); + probeInput.reset(); + + // compare with iterator values + NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record> iterator = + new NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>( + buildInput, probeInput, this.recordSerializer, this.record1Comparator, + this.recordSerializer, this.record2Comparator, this.recordPairComparator, + this.memoryManager, ioManager, this.parentTask, 1.0); + + iterator.open(); + // do first join with both inputs + while (iterator.callWithNextKey(firstMatcher, collector)); + + // assert that each expected match was seen for the first input + for (Entry<Key, Collection<RecordMatch>> entry : expectedFirstMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + + for(int i = 0; i < NUM_PROBES; i++) { + pgen.reset(); + probeInput.reset(); + // prepare .. + iterator.reopenProbe(probeInput); + // .. and do second join + while (iterator.callWithNextKey(nMatcher[i], collector)); + + // assert that each expected match was seen for the second input + for (Entry<Key, Collection<RecordMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + + iterator.close(); + } + + // + // + // Tests taken from HahTableITCase! + // + // + + private final MutableObjectIterator<Record> getProbeInput(final int numKeys, + final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) { + MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true); + MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5); + MutableObjectIterator<Record> probe3 = new ConstantsKeyValuePairsIterator(repeatedValue2, 23, 5); + List<MutableObjectIterator<Record>> probes = new ArrayList<MutableObjectIterator<Record>>(); + probes.add(probe1); + probes.add(probe2); + probes.add(probe3); + return new UnionIterator<Record>(probes); + } + + @Test + public void testSpillingHashJoinWithMassiveCollisions() throws IOException + { + // the following two values are known to have a hash-code collision on the initial level. + // we use them to make sure one partition grows over-proportionally large + final int REPEATED_VALUE_1 = 40559; + final int REPEATED_VALUE_2 = 92882; + final int REPEATED_VALUE_COUNT_BUILD = 200000; + final int REPEATED_VALUE_COUNT_PROBE = 5; + + final int NUM_KEYS = 1000000; + final int BUILD_VALS_PER_KEY = 3; + final int PROBE_VALS_PER_KEY = 10; + + // create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys + MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false); + MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD); + MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD); + List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>(); + builds.add(build1); + builds.add(build2); + builds.add(build3); + MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds); + + + + + // allocate the memory for the HashTable + List<MemorySegment> memSegments; + try { + memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896); + } + catch (MemoryAllocationException maex) { + fail("Memory for the Join could not be provided."); + return; + } + + // create the map for validating the results + HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS); + + // ---------------------------------------------------------------------------------------- + + final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>( + this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, + this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator, + memSegments, ioManager); + + for(int probe = 0; probe < NUM_PROBES; probe++) { + // create a probe input that gives 10 million pairs with 10 values sharing a key + MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2); + if(probe == 0) { + join.open(buildInput, probeInput); + } else { + join.reopenProbe(probeInput); + } + + Record record; + final Record recordReuse = new Record(); + + while (join.nextRecord()) + { + int numBuildValues = 0; + + final Record probeRec = join.getCurrentProbeRecord(); + int key = probeRec.getField(0, IntValue.class).getValue(); + + HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator(); + if ((record = buildSide.next(recordReuse)) != null) { + numBuildValues = 1; + Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); + } + else { + fail("No build side values found for a probe key."); + } + while ((record = buildSide.next(record)) != null) { + numBuildValues++; + Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); + } + + Long contained = map.get(key); + if (contained == null) { + contained = Long.valueOf(numBuildValues); + } + else { + contained = Long.valueOf(contained.longValue() + numBuildValues); + } + + map.put(key, contained); + } + } + + join.close(); + + Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size()); + for (Entry<Integer, Long> entry : map.entrySet()) { + long val = entry.getValue(); + int key = entry.getKey(); + + if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) { + Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, + (PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val); + } else { + Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, + PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val); + } + } + + + // ---------------------------------------------------------------------------------------- + + this.memoryManager.release(join.getFreedMemory()); + } + + /* + * This test is basically identical to the "testSpillingHashJoinWithMassiveCollisions" test, only that the number + * of repeated values (causing bucket collisions) are large enough to make sure that their target partition no longer + * fits into memory by itself and needs to be repartitioned in the recursion again. + */ + @Test + public void testSpillingHashJoinWithTwoRecursions() throws IOException + { + // the following two values are known to have a hash-code collision on the first recursion level. + // we use them to make sure one partition grows over-proportionally large + final int REPEATED_VALUE_1 = 40559; + final int REPEATED_VALUE_2 = 92882; + final int REPEATED_VALUE_COUNT_BUILD = 200000; + final int REPEATED_VALUE_COUNT_PROBE = 5; + + final int NUM_KEYS = 1000000; + final int BUILD_VALS_PER_KEY = 3; + final int PROBE_VALS_PER_KEY = 10; + + // create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys + MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false); + MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD); + MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD); + List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>(); + builds.add(build1); + builds.add(build2); + builds.add(build3); + MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds); + + + // allocate the memory for the HashTable + List<MemorySegment> memSegments; + try { + memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896); + } + catch (MemoryAllocationException maex) { + fail("Memory for the Join could not be provided."); + return; + } + + // create the map for validating the results + HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS); + + // ---------------------------------------------------------------------------------------- + + final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>( + this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, + this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator, + memSegments, ioManager); + for(int probe = 0; probe < NUM_PROBES; probe++) { + // create a probe input that gives 10 million pairs with 10 values sharing a key + MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2); + if(probe == 0) { + join.open(buildInput, probeInput); + } else { + join.reopenProbe(probeInput); + } + Record record; + final Record recordReuse = new Record(); + + while (join.nextRecord()) + { + int numBuildValues = 0; + + final Record probeRec = join.getCurrentProbeRecord(); + int key = probeRec.getField(0, IntValue.class).getValue(); + + HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator(); + if ((record = buildSide.next(recordReuse)) != null) { + numBuildValues = 1; + Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); + } + else { + fail("No build side values found for a probe key."); + } + while ((record = buildSide.next(recordReuse)) != null) { + numBuildValues++; + Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); + } + + Long contained = map.get(key); + if (contained == null) { + contained = Long.valueOf(numBuildValues); + } + else { + contained = Long.valueOf(contained.longValue() + numBuildValues); + } + + map.put(key, contained); + } + } + + join.close(); + Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size()); + for (Entry<Integer, Long> entry : map.entrySet()) { + long val = entry.getValue(); + int key = entry.getKey(); + + if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) { + Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, + (PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val); + } else { + Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, + PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val); + } + } + + + // ---------------------------------------------------------------------------------------- + + this.memoryManager.release(join.getFreedMemory()); + } + + + static Map<Key, Collection<RecordMatch>> deepCopy(Map<Key, Collection<RecordMatch>> expectedSecondMatchesMap) { + Map<Key, Collection<RecordMatch>> copy = new HashMap<Key, Collection<RecordMatch>>(expectedSecondMatchesMap.size()); + for(Entry<Key, Collection<RecordMatch>> entry : expectedSecondMatchesMap.entrySet()) { + List<RecordMatch> matches = new ArrayList<RecordMatch>(entry.getValue().size()); + for(RecordMatch m : entry.getValue()) { + matches.add(m); + } + copy.put(entry.getKey(), matches); + } + return copy; + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java deleted file mode 100644 index 71f1979..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java +++ /dev/null @@ -1,534 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.operators.hash; - -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypePairComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.record.RecordComparator; -import org.apache.flink.api.common.typeutils.record.RecordPairComparator; -import org.apache.flink.api.common.typeutils.record.RecordSerializer; -import org.apache.flink.api.java.record.functions.JoinFunction; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; -import org.apache.flink.runtime.memorymanager.MemoryAllocationException; -import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.operators.hash.HashMatchIteratorITCase.RecordMatch; -import org.apache.flink.runtime.operators.hash.HashMatchIteratorITCase.RecordMatchRemovingJoin; -import org.apache.flink.runtime.operators.hash.HashTableITCase.ConstantsKeyValuePairsIterator; -import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator; -import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; -import org.apache.flink.runtime.operators.testutils.DummyInvokable; -import org.apache.flink.runtime.operators.testutils.TestData; -import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; -import org.apache.flink.runtime.operators.testutils.UnionIterator; -import org.apache.flink.runtime.operators.testutils.TestData.Generator; -import org.apache.flink.runtime.operators.testutils.TestData.Key; -import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode; -import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; -import org.apache.flink.util.MutableObjectIterator; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Test specialized hash join that keeps the build side data (in memory and on hard disk) - * This is used for iterative tasks. - */ -@SuppressWarnings("deprecation") -public class ReOpenableHashTableITCase { - - private static final int PAGE_SIZE = 8 * 1024; - private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages. - - private static final long SEED1 = 561349061987311L; - private static final long SEED2 = 231434613412342L; - - private static final int NUM_PROBES = 3; // number of reopenings of hash join - - private final AbstractInvokable parentTask = new DummyInvokable(); - - private IOManager ioManager; - private MemoryManager memoryManager; - - private TypeSerializer<Record> recordSerializer; - private TypeComparator<Record> record1Comparator; - private TypeComparator<Record> record2Comparator; - private TypePairComparator<Record, Record> recordPairComparator; - - - - - private static final AbstractInvokable MEM_OWNER = new DummyInvokable(); - private TypeSerializer<Record> recordBuildSideAccesssor; - private TypeSerializer<Record> recordProbeSideAccesssor; - private TypeComparator<Record> recordBuildSideComparator; - private TypeComparator<Record> recordProbeSideComparator; - private TypePairComparator<Record, Record> pactRecordComparator; - - @SuppressWarnings("unchecked") - @Before - public void beforeTest() - { - this.recordSerializer = RecordSerializer.get(); - - this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class}); - this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class}); - this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class}); - - - final int[] keyPos = new int[] {0}; - final Class<? extends Key>[] keyType = (Class<? extends Key>[]) new Class[] { IntValue.class }; - - this.recordBuildSideAccesssor = RecordSerializer.get(); - this.recordProbeSideAccesssor = RecordSerializer.get(); - this.recordBuildSideComparator = new RecordComparator(keyPos, keyType); - this.recordProbeSideComparator = new RecordComparator(keyPos, keyType); - this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt(); - - this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE); - this.ioManager = new IOManagerAsync(); - } - - @After - public void afterTest() - { - if (this.ioManager != null) { - this.ioManager.shutdown(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O manager failed to properly shut down."); - } - this.ioManager = null; - } - - if (this.memoryManager != null) { - Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", - this.memoryManager.verifyEmpty()); - this.memoryManager.shutdown(); - this.memoryManager = null; - } - } - - - /** - * Test behavior with overflow buckets (Overflow buckets must be initialized correctly - * if the input is reopened again) - */ - @Test - public void testOverflow() { - - int buildSize = 1000; - int probeSize = 1000; - try { - Generator bgen = new Generator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH); - Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); - - final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize); - final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize); - doTest(buildInput,probeInput, bgen, pgen); - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail("An exception occurred during the test: " + e.getMessage()); - } - } - - /** - * Verify proper operation if the build side is spilled to disk. - */ - @Test - public void testDoubleProbeSpilling() { - - int buildSize = 1000; - int probeSize = 1000; - try { - Generator bgen = new Generator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); - Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); - - final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize); - final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize); - doTest(buildInput,probeInput, bgen, pgen); - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail("An exception occurred during the test: " + e.getMessage()); - } - } - - /** - * This test case verifies that hybrid hash join is able to handle multiple probe phases - * when the build side fits completely into memory. - */ - @Test - public void testDoubleProbeInMemory() { - - int buildSize = 1000; - int probeSize = 1000; - try { - Generator bgen = new Generator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH); - Generator pgen = new Generator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH); - - final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize); - final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize); - - doTest(buildInput,probeInput, bgen, pgen); - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail("An exception occurred during the test: " + e.getMessage()); - } - } - - private void doTest(TestData.GeneratorIterator buildInput, TestData.GeneratorIterator probeInput, Generator bgen, Generator pgen) throws Exception { - // collect expected data - final Map<TestData.Key, Collection<RecordMatch>> expectedFirstMatchesMap = HashMatchIteratorITCase.matchRecordValues( - HashMatchIteratorITCase.collectRecordData(buildInput), - HashMatchIteratorITCase.collectRecordData(probeInput)); - - final List<Map<TestData.Key, Collection<RecordMatch>>> expectedNMatchesMapList = new ArrayList<Map<Key,Collection<RecordMatch>>>(NUM_PROBES); - final JoinFunction[] nMatcher = new RecordMatchRemovingJoin[NUM_PROBES]; - for(int i = 0; i < NUM_PROBES; i++) { - Map<TestData.Key, Collection<RecordMatch>> tmp; - expectedNMatchesMapList.add(tmp = deepCopy(expectedFirstMatchesMap)); - nMatcher[i] = new RecordMatchRemovingJoin(tmp); - } - - final JoinFunction firstMatcher = new RecordMatchRemovingJoin(expectedFirstMatchesMap); - - final Collector<Record> collector = new DiscardingOutputCollector<Record>(); - - // reset the generators - bgen.reset(); - pgen.reset(); - buildInput.reset(); - probeInput.reset(); - - // compare with iterator values - BuildFirstReOpenableHashMatchIterator<Record, Record, Record> iterator = - new BuildFirstReOpenableHashMatchIterator<Record, Record, Record>( - buildInput, probeInput, this.recordSerializer, this.record1Comparator, - this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0); - - iterator.open(); - // do first join with both inputs - while (iterator.callWithNextKey(firstMatcher, collector)); - - // assert that each expected match was seen for the first input - for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedFirstMatchesMap.entrySet()) { - if (!entry.getValue().isEmpty()) { - Assert.fail("Collection for key " + entry.getKey() + " is not empty"); - } - } - - for(int i = 0; i < NUM_PROBES; i++) { - pgen.reset(); - probeInput.reset(); - // prepare .. - iterator.reopenProbe(probeInput); - // .. and do second join - while (iterator.callWithNextKey(nMatcher[i], collector)); - - // assert that each expected match was seen for the second input - for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) { - if (!entry.getValue().isEmpty()) { - Assert.fail("Collection for key " + entry.getKey() + " is not empty"); - } - } - } - - iterator.close(); - } - - // - // - // Tests taken from HahTableITCase! - // - // - - private final MutableObjectIterator<Record> getProbeInput(final int numKeys, - final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) { - MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true); - MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5); - MutableObjectIterator<Record> probe3 = new ConstantsKeyValuePairsIterator(repeatedValue2, 23, 5); - List<MutableObjectIterator<Record>> probes = new ArrayList<MutableObjectIterator<Record>>(); - probes.add(probe1); - probes.add(probe2); - probes.add(probe3); - return new UnionIterator<Record>(probes); - } - - @Test - public void testSpillingHashJoinWithMassiveCollisions() throws IOException - { - // the following two values are known to have a hash-code collision on the initial level. - // we use them to make sure one partition grows over-proportionally large - final int REPEATED_VALUE_1 = 40559; - final int REPEATED_VALUE_2 = 92882; - final int REPEATED_VALUE_COUNT_BUILD = 200000; - final int REPEATED_VALUE_COUNT_PROBE = 5; - - final int NUM_KEYS = 1000000; - final int BUILD_VALS_PER_KEY = 3; - final int PROBE_VALS_PER_KEY = 10; - - // create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys - MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false); - MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD); - MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD); - List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>(); - builds.add(build1); - builds.add(build2); - builds.add(build3); - MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds); - - - - - // allocate the memory for the HashTable - List<MemorySegment> memSegments; - try { - memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896); - } - catch (MemoryAllocationException maex) { - fail("Memory for the Join could not be provided."); - return; - } - - // create the map for validating the results - HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS); - - // ---------------------------------------------------------------------------------------- - - final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>( - this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, - this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator, - memSegments, ioManager); - - for(int probe = 0; probe < NUM_PROBES; probe++) { - // create a probe input that gives 10 million pairs with 10 values sharing a key - MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2); - if(probe == 0) { - join.open(buildInput, probeInput); - } else { - join.reopenProbe(probeInput); - } - - Record record; - final Record recordReuse = new Record(); - - while (join.nextRecord()) - { - int numBuildValues = 0; - - final Record probeRec = join.getCurrentProbeRecord(); - int key = probeRec.getField(0, IntValue.class).getValue(); - - HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator(); - if ((record = buildSide.next(recordReuse)) != null) { - numBuildValues = 1; - Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); - } - else { - fail("No build side values found for a probe key."); - } - while ((record = buildSide.next(record)) != null) { - numBuildValues++; - Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); - } - - Long contained = map.get(key); - if (contained == null) { - contained = Long.valueOf(numBuildValues); - } - else { - contained = Long.valueOf(contained.longValue() + numBuildValues); - } - - map.put(key, contained); - } - } - - join.close(); - - Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size()); - for (Map.Entry<Integer, Long> entry : map.entrySet()) { - long val = entry.getValue(); - int key = entry.getKey(); - - if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) { - Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, - (PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val); - } else { - Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, - PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val); - } - } - - - // ---------------------------------------------------------------------------------------- - - this.memoryManager.release(join.getFreedMemory()); - } - - /* - * This test is basically identical to the "testSpillingHashJoinWithMassiveCollisions" test, only that the number - * of repeated values (causing bucket collisions) are large enough to make sure that their target partition no longer - * fits into memory by itself and needs to be repartitioned in the recursion again. - */ - @Test - public void testSpillingHashJoinWithTwoRecursions() throws IOException - { - // the following two values are known to have a hash-code collision on the first recursion level. - // we use them to make sure one partition grows over-proportionally large - final int REPEATED_VALUE_1 = 40559; - final int REPEATED_VALUE_2 = 92882; - final int REPEATED_VALUE_COUNT_BUILD = 200000; - final int REPEATED_VALUE_COUNT_PROBE = 5; - - final int NUM_KEYS = 1000000; - final int BUILD_VALS_PER_KEY = 3; - final int PROBE_VALS_PER_KEY = 10; - - // create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys - MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false); - MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD); - MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD); - List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>(); - builds.add(build1); - builds.add(build2); - builds.add(build3); - MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds); - - - // allocate the memory for the HashTable - List<MemorySegment> memSegments; - try { - memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896); - } - catch (MemoryAllocationException maex) { - fail("Memory for the Join could not be provided."); - return; - } - - // create the map for validating the results - HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS); - - // ---------------------------------------------------------------------------------------- - - final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>( - this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, - this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator, - memSegments, ioManager); - for(int probe = 0; probe < NUM_PROBES; probe++) { - // create a probe input that gives 10 million pairs with 10 values sharing a key - MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2); - if(probe == 0) { - join.open(buildInput, probeInput); - } else { - join.reopenProbe(probeInput); - } - Record record; - final Record recordReuse = new Record(); - - while (join.nextRecord()) - { - int numBuildValues = 0; - - final Record probeRec = join.getCurrentProbeRecord(); - int key = probeRec.getField(0, IntValue.class).getValue(); - - HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator(); - if ((record = buildSide.next(recordReuse)) != null) { - numBuildValues = 1; - Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); - } - else { - fail("No build side values found for a probe key."); - } - while ((record = buildSide.next(recordReuse)) != null) { - numBuildValues++; - Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); - } - - Long contained = map.get(key); - if (contained == null) { - contained = Long.valueOf(numBuildValues); - } - else { - contained = Long.valueOf(contained.longValue() + numBuildValues); - } - - map.put(key, contained); - } - } - - join.close(); - Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size()); - for (Map.Entry<Integer, Long> entry : map.entrySet()) { - long val = entry.getValue(); - int key = entry.getKey(); - - if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) { - Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, - (PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val); - } else { - Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, - PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val); - } - } - - - // ---------------------------------------------------------------------------------------- - - this.memoryManager.release(join.getFreedMemory()); - } - - - static Map<Key, Collection<RecordMatch>> deepCopy(Map<Key, Collection<RecordMatch>> expectedSecondMatchesMap) { - Map<Key, Collection<RecordMatch>> copy = new HashMap<Key, Collection<RecordMatch>>(expectedSecondMatchesMap.size()); - for(Map.Entry<Key, Collection<RecordMatch>> entry : expectedSecondMatchesMap.entrySet()) { - List<RecordMatch> matches = new ArrayList<RecordMatch>(entry.getValue().size()); - for(RecordMatch m : entry.getValue()) { - matches.add(m); - } - copy.put(entry.getKey(), matches); - } - return copy; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java new file mode 100644 index 0000000..18cd8d0 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java @@ -0,0 +1,778 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.hash; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.record.RecordComparator; +import org.apache.flink.api.common.typeutils.record.RecordPairComparator; +import org.apache.flink.api.common.typeutils.record.RecordSerializer; +import org.apache.flink.api.java.record.functions.JoinFunction; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.runtime.operators.testutils.TestData; +import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator; +import org.apache.flink.runtime.operators.testutils.UnionIterator; +import org.apache.flink.runtime.operators.testutils.TestData.Generator; +import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode; +import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode; +import org.apache.flink.runtime.operators.testutils.types.IntPair; +import org.apache.flink.runtime.operators.testutils.types.IntPairComparator; +import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.NullKeyFieldException; +import org.apache.flink.types.Record; +import org.apache.flink.types.Value; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +@SuppressWarnings({"serial", "deprecation"}) +public class ReusingHashMatchIteratorITCase { + + private static final int MEMORY_SIZE = 16000000; // total memory + + private static final int INPUT_1_SIZE = 20000; + private static final int INPUT_2_SIZE = 1000; + + private static final long SEED1 = 561349061987311L; + private static final long SEED2 = 231434613412342L; + + private final AbstractInvokable parentTask = new DummyInvokable(); + + private IOManager ioManager; + private MemoryManager memoryManager; + + private TypeSerializer<Record> recordSerializer; + private TypeComparator<Record> record1Comparator; + private TypeComparator<Record> record2Comparator; + private TypePairComparator<Record, Record> recordPairComparator; + + private TypeSerializer<IntPair> pairSerializer; + private TypeComparator<IntPair> pairComparator; + private TypePairComparator<IntPair, Record> pairRecordPairComparator; + private TypePairComparator<Record, IntPair> recordPairPairComparator; + + + @SuppressWarnings("unchecked") + @Before + public void beforeTest() { + this.recordSerializer = RecordSerializer.get(); + + this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class}); + this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class}); + + this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class}); + + this.pairSerializer = new IntPairSerializer(); + this.pairComparator = new IntPairComparator(); + this.pairRecordPairComparator = new IntPairRecordPairComparator(); + this.recordPairPairComparator = new RecordIntPairPairComparator(); + + this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1); + this.ioManager = new IOManagerAsync(); + } + + @After + public void afterTest() { + if (this.ioManager != null) { + this.ioManager.shutdown(); + if (!this.ioManager.isProperlyShutDown()) { + Assert.fail("I/O manager failed to properly shut down."); + } + this.ioManager = null; + } + + if (this.memoryManager != null) { + Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", + this.memoryManager.verifyEmpty()); + this.memoryManager.shutdown(); + this.memoryManager = null; + } + } + + + @Test + public void testBuildFirst() { + try { + Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + + final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE); + + // collect expected data + final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues( + collectRecordData(input1), + collectRecordData(input2)); + + final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap); + final Collector<Record> collector = new DiscardingOutputCollector<Record>(); + + // reset the generators + generator1.reset(); + generator2.reset(); + input1.reset(); + input2.reset(); + + // compare with iterator values + ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator = + new ReusingBuildFirstHashMatchIterator<Record, Record, Record>( + input1, input2, this.recordSerializer, this.record1Comparator, + this.recordSerializer, this.record2Comparator, this.recordPairComparator, + this.memoryManager, ioManager, this.parentTask, 1.0); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + @Test + public void testBuildFirstWithHighNumberOfCommonKeys() + { + // the size of the left and right inputs + final int INPUT_1_SIZE = 200; + final int INPUT_2_SIZE = 100; + + final int INPUT_1_DUPLICATES = 10; + final int INPUT_2_DUPLICATES = 2000; + final int DUPLICATE_KEY = 13; + + try { + Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + + final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE); + + final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES); + final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES); + + final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>(); + inList1.add(gen1Iter); + inList1.add(const1Iter); + + final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>(); + inList2.add(gen2Iter); + inList2.add(const2Iter); + + MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1); + MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2); + + + // collect expected data + final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues( + collectRecordData(input1), + collectRecordData(input2)); + + // re-create the whole thing for actual processing + + // reset the generators and iterators + generator1.reset(); + generator2.reset(); + const1Iter.reset(); + const2Iter.reset(); + gen1Iter.reset(); + gen2Iter.reset(); + + inList1.clear(); + inList1.add(gen1Iter); + inList1.add(const1Iter); + + inList2.clear(); + inList2.add(gen2Iter); + inList2.add(const2Iter); + + input1 = new UnionIterator<Record>(inList1); + input2 = new UnionIterator<Record>(inList2); + + final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap); + final Collector<Record> collector = new DiscardingOutputCollector<Record>(); + + ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator = + new ReusingBuildFirstHashMatchIterator<Record, Record, Record>( + input1, input2, this.recordSerializer, this.record1Comparator, + this.recordSerializer, this.record2Comparator, this.recordPairComparator, + this.memoryManager, ioManager, this.parentTask, 1.0); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + @Test + public void testBuildSecond() { + try { + Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + + final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE); + + // collect expected data + final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues( + collectRecordData(input1), + collectRecordData(input2)); + + final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap); + final Collector<Record> collector = new DiscardingOutputCollector<Record>(); + + // reset the generators + generator1.reset(); + generator2.reset(); + input1.reset(); + input2.reset(); + + // compare with iterator values + ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator = + new ReusingBuildSecondHashMatchIterator<Record, Record, Record>( + input1, input2, this.recordSerializer, this.record1Comparator, + this.recordSerializer, this.record2Comparator, this.recordPairComparator, + this.memoryManager, ioManager, this.parentTask, 1.0); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + @Test + public void testBuildSecondWithHighNumberOfCommonKeys() + { + // the size of the left and right inputs + final int INPUT_1_SIZE = 200; + final int INPUT_2_SIZE = 100; + + final int INPUT_1_DUPLICATES = 10; + final int INPUT_2_DUPLICATES = 2000; + final int DUPLICATE_KEY = 13; + + try { + Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + + final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE); + + final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES); + final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES); + + final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>(); + inList1.add(gen1Iter); + inList1.add(const1Iter); + + final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>(); + inList2.add(gen2Iter); + inList2.add(const2Iter); + + MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1); + MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2); + + + // collect expected data + final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues( + collectRecordData(input1), + collectRecordData(input2)); + + // re-create the whole thing for actual processing + + // reset the generators and iterators + generator1.reset(); + generator2.reset(); + const1Iter.reset(); + const2Iter.reset(); + gen1Iter.reset(); + gen2Iter.reset(); + + inList1.clear(); + inList1.add(gen1Iter); + inList1.add(const1Iter); + + inList2.clear(); + inList2.add(gen2Iter); + inList2.add(const2Iter); + + input1 = new UnionIterator<Record>(inList1); + input2 = new UnionIterator<Record>(inList2); + + final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap); + final Collector<Record> collector = new DiscardingOutputCollector<Record>(); + + ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator = + new ReusingBuildSecondHashMatchIterator<Record, Record, Record>( + input1, input2, this.recordSerializer, this.record1Comparator, + this.recordSerializer, this.record2Comparator, this.recordPairComparator, + this.memoryManager, ioManager, this.parentTask, 1.0); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + @Test + public void testBuildFirstWithMixedDataTypes() { + try { + MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false); + + final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE); + + // collect expected data + final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues( + collectIntPairData(input1), + collectRecordData(input2)); + + final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap); + final Collector<Record> collector = new DiscardingOutputCollector<Record>(); + + // reset the generators + input1 = new UniformIntPairGenerator(500, 40, false); + generator2.reset(); + input2.reset(); + + // compare with iterator values + ReusingBuildSecondHashMatchIterator<IntPair, Record, Record> iterator = + new ReusingBuildSecondHashMatchIterator<IntPair, Record, Record>( + input1, input2, this.pairSerializer, this.pairComparator, + this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator, + this.memoryManager, this.ioManager, this.parentTask, 1.0); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + @Test + public void testBuildSecondWithMixedDataTypes() { + try { + MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false); + + final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE); + + // collect expected data + final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues( + collectIntPairData(input1), + collectRecordData(input2)); + + final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap); + final Collector<Record> collector = new DiscardingOutputCollector<Record>(); + + // reset the generators + input1 = new UniformIntPairGenerator(500, 40, false); + generator2.reset(); + input2.reset(); + + // compare with iterator values + ReusingBuildFirstHashMatchIterator<IntPair, Record, Record> iterator = + new ReusingBuildFirstHashMatchIterator<IntPair, Record, Record>( + input1, input2, this.pairSerializer, this.pairComparator, + this.recordSerializer, this.record2Comparator, this.recordPairPairComparator, + this.memoryManager, this.ioManager, this.parentTask, 1.0); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + + + static Map<TestData.Key, Collection<RecordMatch>> matchRecordValues( + Map<TestData.Key, Collection<TestData.Value>> leftMap, + Map<TestData.Key, Collection<TestData.Value>> rightMap) + { + Map<TestData.Key, Collection<RecordMatch>> map = new HashMap<TestData.Key, Collection<RecordMatch>>(); + + for (TestData.Key key : leftMap.keySet()) { + Collection<TestData.Value> leftValues = leftMap.get(key); + Collection<TestData.Value> rightValues = rightMap.get(key); + + if (rightValues == null) { + continue; + } + + if (!map.containsKey(key)) { + map.put(key, new ArrayList<RecordMatch>()); + } + + Collection<RecordMatch> matchedValues = map.get(key); + + for (TestData.Value leftValue : leftValues) { + for (TestData.Value rightValue : rightValues) { + matchedValues.add(new RecordMatch(leftValue, rightValue)); + } + } + } + + return map; + } + + static Map<TestData.Key, Collection<RecordIntPairMatch>> matchRecordIntPairValues( + Map<Integer, Collection<Integer>> leftMap, + Map<TestData.Key, Collection<TestData.Value>> rightMap) + { + final Map<TestData.Key, Collection<RecordIntPairMatch>> map = new HashMap<TestData.Key, Collection<RecordIntPairMatch>>(); + + for (Integer i : leftMap.keySet()) { + + final TestData.Key key = new TestData.Key(i.intValue()); + + final Collection<Integer> leftValues = leftMap.get(i); + final Collection<TestData.Value> rightValues = rightMap.get(key); + + if (rightValues == null) { + continue; + } + + if (!map.containsKey(key)) { + map.put(key, new ArrayList<RecordIntPairMatch>()); + } + + final Collection<RecordIntPairMatch> matchedValues = map.get(key); + + for (Integer v : leftValues) { + for (TestData.Value val : rightValues) { + matchedValues.add(new RecordIntPairMatch(v, val)); + } + } + } + + return map; + } + + + static Map<TestData.Key, Collection<TestData.Value>> collectRecordData(MutableObjectIterator<Record> iter) + throws Exception + { + Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>(); + Record pair = new Record(); + + while ((pair = iter.next(pair)) != null) { + + TestData.Key key = pair.getField(0, TestData.Key.class); + if (!map.containsKey(key)) { + map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>()); + } + + Collection<TestData.Value> values = map.get(key); + values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue())); + } + + return map; + } + + static Map<Integer, Collection<Integer>> collectIntPairData(MutableObjectIterator<IntPair> iter) + throws Exception + { + Map<Integer, Collection<Integer>> map = new HashMap<Integer, Collection<Integer>>(); + IntPair pair = new IntPair(); + + while ((pair = iter.next(pair)) != null) { + + final int key = pair.getKey(); + final int value = pair.getValue(); + if (!map.containsKey(key)) { + map.put(key, new ArrayList<Integer>()); + } + + Collection<Integer> values = map.get(key); + values.add(value); + } + + return map; + } + + /** + * Private class used for storage of the expected matches in a hash-map. + */ + static class RecordMatch { + + private final Value left; + private final Value right; + + public RecordMatch(Value left, Value right) { + this.left = left; + this.right = right; + } + + @Override + public boolean equals(Object obj) { + RecordMatch o = (RecordMatch) obj; + return this.left.equals(o.left) && this.right.equals(o.right); + } + + @Override + public int hashCode() { + return this.left.hashCode() ^ this.right.hashCode(); + } + + @Override + public String toString() { + return left + ", " + right; + } + } + + /** + * Private class used for storage of the expected matches in a hash-map. + */ + static class RecordIntPairMatch + { + private final int left; + private final Value right; + + public RecordIntPairMatch(int left, Value right) { + this.left = left; + this.right = right; + } + + @Override + public boolean equals(Object obj) { + RecordIntPairMatch o = (RecordIntPairMatch) obj; + return this.left == o.left && this.right.equals(o.right); + } + + @Override + public int hashCode() { + return this.left ^ this.right.hashCode(); + } + + @Override + public String toString() { + return left + ", " + right; + } + } + + static final class RecordMatchRemovingJoin extends JoinFunction + { + private final Map<TestData.Key, Collection<RecordMatch>> toRemoveFrom; + + protected RecordMatchRemovingJoin(Map<TestData.Key, Collection<RecordMatch>> map) { + this.toRemoveFrom = map; + } + + @Override + public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception + { + TestData.Key key = rec1.getField(0, TestData.Key.class); + TestData.Value value1 = rec1.getField(1, TestData.Value.class); + TestData.Value value2 = rec2.getField(1, TestData.Value.class); + //System.err.println("rec1 key = "+key+" rec2 key= "+rec2.getField(0, TestData.Key.class)); + Collection<RecordMatch> matches = this.toRemoveFrom.get(key); + if (matches == null) { + Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected."); + } + + Assert.assertTrue("Produced match was not contained: " + key + " - " + value1 + ":" + value2, + matches.remove(new RecordMatch(value1, value2))); + + if (matches.isEmpty()) { + this.toRemoveFrom.remove(key); + } + } + } + + static final class RecordIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Record, Record> + { + private final Map<TestData.Key, Collection<RecordIntPairMatch>> toRemoveFrom; + + protected RecordIntPairMatchRemovingMatcher(Map<TestData.Key, Collection<RecordIntPairMatch>> map) { + this.toRemoveFrom = map; + } + + @Override + public void join(IntPair rec1, Record rec2, Collector<Record> out) throws Exception + { + final int k = rec1.getKey(); + final int v = rec1.getValue(); + + final TestData.Key key = rec2.getField(0, TestData.Key.class); + final TestData.Value value = rec2.getField(1, TestData.Value.class); + + Assert.assertTrue("Key does not match for matching IntPair Record combination.", k == key.getKey()); + + Collection<RecordIntPairMatch> matches = this.toRemoveFrom.get(key); + if (matches == null) { + Assert.fail("Match " + key + " - " + v + ":" + value + " is unexpected."); + } + + Assert.assertTrue("Produced match was not contained: " + key + " - " + v + ":" + value, + matches.remove(new RecordIntPairMatch(v, value))); + + if (matches.isEmpty()) { + this.toRemoveFrom.remove(key); + } + } + } + + static final class IntPairRecordPairComparator extends TypePairComparator<IntPair, Record> + { + private int reference; + + @Override + public void setReference(IntPair reference) { + this.reference = reference.getKey(); + } + + @Override + public boolean equalToReference(Record candidate) { + try { + final IntValue i = candidate.getField(0, IntValue.class); + return i.getValue() == this.reference; + } catch (NullPointerException npex) { + throw new NullKeyFieldException(); + } + } + + @Override + public int compareToReference(Record candidate) { + try { + final IntValue i = candidate.getField(0, IntValue.class); + return i.getValue() - this.reference; + } catch (NullPointerException npex) { + throw new NullKeyFieldException(); + } + } + } + + static final class RecordIntPairPairComparator extends TypePairComparator<Record, IntPair> + { + private int reference; + + @Override + public void setReference(Record reference) { + this.reference = reference.getField(0, IntValue.class).getValue(); + } + + @Override + public boolean equalToReference(IntPair candidate) { + return this.reference == candidate.getKey(); + } + + @Override + public int compareToReference(IntPair candidate) { + return candidate.getKey() - this.reference; + } + } +}
