[FLINK-3385] [runtime] Fix outer join skipping unprobed partitions This closes #1680
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c48e346 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c48e346 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c48e346 Branch: refs/heads/master Commit: 1c48e3462e9ea16615651fc1984e61d5426fa864 Parents: 03923c3 Author: Greg Hogan <c...@greghogan.com> Authored: Fri Feb 19 14:59:33 2016 -0500 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Feb 23 01:18:42 2016 +0100 ---------------------------------------------------------------------- .../runtime/operators/hash/HashPartition.java | 9 +- .../operators/hash/MutableHashTable.java | 96 ++++++++++++++------ .../operators/hash/ReOpenableHashPartition.java | 21 +++-- .../runtime/operators/hash/HashTableITCase.java | 52 ++++++++++- 4 files changed, 135 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1c48e346/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java index 97bef4a..44ee163 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java @@ -339,11 +339,14 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See } /** + * @param keepUnprobedSpilledPartitions If true then partitions that were spilled but received no further probe + * requests will be retained; used for build-side outer joins. * @return The number of write-behind buffers reclaimable after this method call. * * @throws IOException */ - public int finalizeProbePhase(List<MemorySegment> freeMemory, List<HashPartition<BT, PT>> spilledPartitions) + public int finalizeProbePhase(List<MemorySegment> freeMemory, List<HashPartition<BT, PT>> spilledPartitions, + boolean keepUnprobedSpilledPartitions) throws IOException { if (isInMemory()) { @@ -363,11 +366,11 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See this.partitionBuffers = null; return 0; } - else if (this.probeSideRecordCounter == 0) { + else if (this.probeSideRecordCounter == 0 && !keepUnprobedSpilledPartitions) { // partition is empty, no spilled buffers // return the memory buffer freeMemory.add(this.probeSideBuffer.getCurrentSegment()); - + // delete the spill files this.probeSideChannel.close(); this.buildSideChannel.deleteChannel(); http://git-wip-us.apache.org/repos/asf/flink/blob/1c48e346/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java index 3f00bf9..7d1fc11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java @@ -288,12 +288,17 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource { * Iterator over the elements from the probe side. */ protected ProbeIterator<PT> probeIterator; - + + /** + * The reader for the spilled-file of the build partition that is currently read. + */ + private BlockChannelReader<MemorySegment> currentSpilledBuildSide; + /** * The reader for the spilled-file of the probe partition that is currently read. */ private BlockChannelReader<MemorySegment> currentSpilledProbeSide; - + /** * The channel enumerator that is used while processing the current partition to create * channels for the spill partitions it requires. @@ -558,38 +563,40 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource { } protected boolean prepareNextPartition() throws IOException { - - this.probeMatchedPhase = true; - this.unmatchedBuildVisited = false; - // finalize and cleanup the partitions of the current table int buffersAvailable = 0; for (int i = 0; i < this.partitionsBeingBuilt.size(); i++) { final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(i); p.setFurtherPatitioning(this.furtherPartitioning); - buffersAvailable += p.finalizeProbePhase(this.availableMemory, this.partitionsPending); + buffersAvailable += p.finalizeProbePhase(this.availableMemory, this.partitionsPending, this.buildSideOuterJoin); } - + this.partitionsBeingBuilt.clear(); this.writeBehindBuffersAvailable += buffersAvailable; - + releaseTable(); + if (this.currentSpilledBuildSide != null) { + this.currentSpilledBuildSide.closeAndDelete(); + this.currentSpilledBuildSide = null; + } + if (this.currentSpilledProbeSide != null) { this.currentSpilledProbeSide.closeAndDelete(); this.currentSpilledProbeSide = null; } - // check if there are pending partitions - if (!this.partitionsPending.isEmpty()) { - final HashPartition<BT, PT> p = this.partitionsPending.get(0); - - // build the next table - buildTableFromSpilledPartition(p); + if (this.partitionsPending.isEmpty()) { + // no more data + return false; + } + + // there are pending partitions + final HashPartition<BT, PT> p = this.partitionsPending.get(0); - // set the probe side - gather memory segments for reading - LinkedBlockingQueue<MemorySegment> returnQueue = new LinkedBlockingQueue<MemorySegment>(); - this.currentSpilledProbeSide = this.ioManager.createBlockChannelReader(p.getProbeSideChannel().getChannelID(), returnQueue); + if (p.probeSideRecordCounter == 0) { + // unprobed spilled partitions are only re-processed for a build-side outer join; + // there is no need to create a hash table since there are no probe-side records List<MemorySegment> memory = new ArrayList<MemorySegment>(); MemorySegment seg1 = getNextBuffer(); @@ -601,24 +608,55 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource { } } else { - throw new IllegalStateException("Attempting to begin probing of partition without any memory available"); + throw new IllegalStateException("Attempting to begin reading spilled partition without any memory available"); } - ChannelReaderInputViewIterator<PT> probeReader = new ChannelReaderInputViewIterator<PT>(this.currentSpilledProbeSide, - returnQueue, memory, this.availableMemory, this.probeSideSerializer, p.getProbeSideBlockCount()); - this.probeIterator.set(probeReader); + this.currentSpilledBuildSide = this.ioManager.createBlockChannelReader(p.getBuildSideChannel().getChannelID()); + final ChannelReaderInputView inView = new HeaderlessChannelReaderInputView(currentSpilledBuildSide, memory, + p.getBuildSideBlockCount(), p.getLastSegmentLimit(), false); + final ChannelReaderInputViewIterator<BT> inIter = new ChannelReaderInputViewIterator<BT>(inView, + this.availableMemory, this.buildSideSerializer); + + this.unmatchedBuildIterator = inIter; - // unregister the pending partition this.partitionsPending.remove(0); - this.currentRecursionDepth = p.getRecursionLevel() + 1; - - // recursively get the next - return nextRecord(); + + return true; + } + + this.probeMatchedPhase = true; + this.unmatchedBuildVisited = false; + + // build the next table; memory must be allocated after this call + buildTableFromSpilledPartition(p); + + // set the probe side - gather memory segments for reading + LinkedBlockingQueue<MemorySegment> returnQueue = new LinkedBlockingQueue<MemorySegment>(); + this.currentSpilledProbeSide = this.ioManager.createBlockChannelReader(p.getProbeSideChannel().getChannelID(), returnQueue); + + List<MemorySegment> memory = new ArrayList<MemorySegment>(); + MemorySegment seg1 = getNextBuffer(); + if (seg1 != null) { + memory.add(seg1); + MemorySegment seg2 = getNextBuffer(); + if (seg2 != null) { + memory.add(seg2); + } } else { - // no more data - return false; + throw new IllegalStateException("Attempting to begin probing of partition without any memory available"); } + + ChannelReaderInputViewIterator<PT> probeReader = new ChannelReaderInputViewIterator<PT>(this.currentSpilledProbeSide, + returnQueue, memory, this.availableMemory, this.probeSideSerializer, p.getProbeSideBlockCount()); + this.probeIterator.set(probeReader); + + // unregister the pending partition + this.partitionsPending.remove(0); + this.currentRecursionDepth = p.getRecursionLevel() + 1; + + // recursively get the next + return nextRecord(); } public boolean nextRecord() throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/1c48e346/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java index 84868ff..78fca67 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java @@ -61,7 +61,8 @@ public class ReOpenableHashPartition<BT, PT> extends HashPartition<BT, PT> { @Override public int finalizeProbePhase(List<MemorySegment> freeMemory, - List<HashPartition<BT, PT>> spilledPartitions) throws IOException { + List<HashPartition<BT, PT>> spilledPartitions, + boolean keepUnprobedSpilledPartitions) throws IOException { if ( furtherPartitioning || recursionLevel != 0 || isRestored) { if (isInMemory() && initialBuildSideChannel != null && !isRestored) { // return the overflow segments @@ -74,22 +75,22 @@ public class ReOpenableHashPartition<BT, PT> extends HashPartition<BT, PT> { // we already returned the partitionBuffers via the returnQueue. return 0; } - return super.finalizeProbePhase(freeMemory, spilledPartitions); + return super.finalizeProbePhase(freeMemory, spilledPartitions, keepUnprobedSpilledPartitions); } - if (!isInMemory() && this.probeSideRecordCounter == 0) { + if (isInMemory()) { + return 0; + } else if (this.probeSideRecordCounter == 0 && !keepUnprobedSpilledPartitions) { freeMemory.add(this.probeSideBuffer.getCurrentSegment()); // delete the spill files this.probeSideChannel.close(); this.probeSideChannel.deleteChannel(); return 0; + } else { + this.probeSideBuffer.close(); + this.probeSideChannel.close(); // finish pending write requests. + spilledPartitions.add(this); + return 1; } - if (isInMemory()) { - return 0; - } - this.probeSideBuffer.close(); - this.probeSideChannel.close(); // finish pending write requests. - spilledPartitions.add(this); - return 1; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/1c48e346/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java index ed709c0..d9d6a25 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java @@ -677,7 +677,57 @@ public class HashTableITCase { this.memManager.release(join.getFreedMemory()); } - + + /* + * Same test as {@link #testSparseProbeSpilling} but using a build-side outer join + * that requires spilled build-side records to be returned and counted. + */ + @Test + public void testSparseProbeSpillingWithOuterJoin() throws IOException, MemoryAllocationException + { + final int NUM_BUILD_KEYS = 1000000; + final int NUM_BUILD_VALS = 1; + final int NUM_PROBE_KEYS = 20; + final int NUM_PROBE_VALS = 1; + + MutableObjectIterator<Record> buildInput = new UniformRecordGenerator( + NUM_BUILD_KEYS, NUM_BUILD_VALS, false); + + // allocate the memory for the HashTable + List<MemorySegment> memSegments; + try { + memSegments = this.memManager.allocatePages(MEM_OWNER, 96); + } + catch (MemoryAllocationException maex) { + fail("Memory for the Join could not be provided."); + return; + } + + final MutableHashTable<Record, Record> join = new MutableHashTable<Record, Record>( + this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, + this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator, + memSegments, ioManager); + join.open(buildInput, new UniformRecordGenerator(NUM_PROBE_KEYS, NUM_PROBE_VALS, true), true); + + int expectedNumResults = (Math.max(NUM_PROBE_KEYS, NUM_BUILD_KEYS) * NUM_BUILD_VALS) + * NUM_PROBE_VALS; + + final Record recordReuse = new Record(); + int numRecordsInJoinResult = 0; + + while (join.nextRecord()) { + MutableObjectIterator<Record> buildSide = join.getBuildSideIterator(); + while (buildSide.next(recordReuse) != null) { + numRecordsInJoinResult++; + } + } + Assert.assertEquals("Wrong number of records in join result.", expectedNumResults, numRecordsInJoinResult); + + join.close(); + + this.memManager.release(join.getFreedMemory()); + } + /* * This test validates a bug fix against former memory loss in the case where a partition was spilled * during an insert into the same.