[FLINK-3385] [runtime] Fix outer join skipping unprobed partitions

This closes #1680


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c48e346
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c48e346
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c48e346

Branch: refs/heads/master
Commit: 1c48e3462e9ea16615651fc1984e61d5426fa864
Parents: 03923c3
Author: Greg Hogan <c...@greghogan.com>
Authored: Fri Feb 19 14:59:33 2016 -0500
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Feb 23 01:18:42 2016 +0100

----------------------------------------------------------------------
 .../runtime/operators/hash/HashPartition.java   |  9 +-
 .../operators/hash/MutableHashTable.java        | 96 ++++++++++++++------
 .../operators/hash/ReOpenableHashPartition.java | 21 +++--
 .../runtime/operators/hash/HashTableITCase.java | 52 ++++++++++-
 4 files changed, 135 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1c48e346/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index 97bef4a..44ee163 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -339,11 +339,14 @@ public class HashPartition<BT, PT> extends 
AbstractPagedInputView implements See
        }
        
        /**
+        * @param keepUnprobedSpilledPartitions If true then partitions that 
were spilled but received no further probe
+        *                                      requests will be retained; used 
for build-side outer joins.
         * @return The number of write-behind buffers reclaimable after this 
method call.
         * 
         * @throws IOException
         */
-       public int finalizeProbePhase(List<MemorySegment> freeMemory, 
List<HashPartition<BT, PT>> spilledPartitions)
+       public int finalizeProbePhase(List<MemorySegment> freeMemory, 
List<HashPartition<BT, PT>> spilledPartitions,
+                       boolean keepUnprobedSpilledPartitions)
        throws IOException
        {
                if (isInMemory()) {
@@ -363,11 +366,11 @@ public class HashPartition<BT, PT> extends 
AbstractPagedInputView implements See
                        this.partitionBuffers = null;
                        return 0;
                }
-               else if (this.probeSideRecordCounter == 0) { 
+               else if (this.probeSideRecordCounter == 0 && 
!keepUnprobedSpilledPartitions) {
                        // partition is empty, no spilled buffers
                        // return the memory buffer
                        
freeMemory.add(this.probeSideBuffer.getCurrentSegment());
-                       
+
                        // delete the spill files
                        this.probeSideChannel.close();
                        this.buildSideChannel.deleteChannel();

http://git-wip-us.apache.org/repos/asf/flink/blob/1c48e346/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 3f00bf9..7d1fc11 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -288,12 +288,17 @@ public class MutableHashTable<BT, PT> implements 
MemorySegmentSource {
         * Iterator over the elements from the probe side.
         */
        protected ProbeIterator<PT> probeIterator;
-       
+
+       /**
+        * The reader for the spilled-file of the build partition that is 
currently read.
+        */
+       private BlockChannelReader<MemorySegment> currentSpilledBuildSide;
+
        /**
         * The reader for the spilled-file of the probe partition that is 
currently read.
         */
        private BlockChannelReader<MemorySegment> currentSpilledProbeSide;
-       
+
        /**
         * The channel enumerator that is used while processing the current 
partition to create
         * channels for the spill partitions it requires.
@@ -558,38 +563,40 @@ public class MutableHashTable<BT, PT> implements 
MemorySegmentSource {
        }
        
        protected boolean prepareNextPartition() throws IOException {
-               
-               this.probeMatchedPhase = true;
-               this.unmatchedBuildVisited = false;
-               
                // finalize and cleanup the partitions of the current table
                int buffersAvailable = 0;
                for (int i = 0; i < this.partitionsBeingBuilt.size(); i++) {
                        final HashPartition<BT, PT> p = 
this.partitionsBeingBuilt.get(i);
                        p.setFurtherPatitioning(this.furtherPartitioning);
-                       buffersAvailable += 
p.finalizeProbePhase(this.availableMemory, this.partitionsPending);
+                       buffersAvailable += 
p.finalizeProbePhase(this.availableMemory, this.partitionsPending, 
this.buildSideOuterJoin);
                }
-               
+
                this.partitionsBeingBuilt.clear();
                this.writeBehindBuffersAvailable += buffersAvailable;
-               
+
                releaseTable();
 
+               if (this.currentSpilledBuildSide != null) {
+                       this.currentSpilledBuildSide.closeAndDelete();
+                       this.currentSpilledBuildSide = null;
+               }
+
                if (this.currentSpilledProbeSide != null) {
                        this.currentSpilledProbeSide.closeAndDelete();
                        this.currentSpilledProbeSide = null;
                }
 
-               // check if there are pending partitions
-               if (!this.partitionsPending.isEmpty()) {
-                       final HashPartition<BT, PT> p = 
this.partitionsPending.get(0);
-                       
-                       // build the next table
-                       buildTableFromSpilledPartition(p);
+               if (this.partitionsPending.isEmpty()) {
+                       // no more data
+                       return false;
+               }
+
+               // there are pending partitions
+               final HashPartition<BT, PT> p = this.partitionsPending.get(0);
 
-                       // set the probe side - gather memory segments for 
reading
-                       LinkedBlockingQueue<MemorySegment> returnQueue = new 
LinkedBlockingQueue<MemorySegment>();
-                       this.currentSpilledProbeSide = 
this.ioManager.createBlockChannelReader(p.getProbeSideChannel().getChannelID(), 
returnQueue);
+               if (p.probeSideRecordCounter == 0) {
+                       // unprobed spilled partitions are only re-processed 
for a build-side outer join;
+                       // there is no need to create a hash table since there 
are no probe-side records
 
                        List<MemorySegment> memory = new 
ArrayList<MemorySegment>();
                        MemorySegment seg1 = getNextBuffer();
@@ -601,24 +608,55 @@ public class MutableHashTable<BT, PT> implements 
MemorySegmentSource {
                                }
                        }
                        else {
-                               throw new IllegalStateException("Attempting to 
begin probing of partition without any memory available");
+                               throw new IllegalStateException("Attempting to 
begin reading spilled partition without any memory available");
                        }
 
-                       ChannelReaderInputViewIterator<PT> probeReader = new 
ChannelReaderInputViewIterator<PT>(this.currentSpilledProbeSide,
-                               returnQueue, memory, this.availableMemory, 
this.probeSideSerializer, p.getProbeSideBlockCount());
-                       this.probeIterator.set(probeReader);
+                       this.currentSpilledBuildSide = 
this.ioManager.createBlockChannelReader(p.getBuildSideChannel().getChannelID());
+                       final ChannelReaderInputView inView = new 
HeaderlessChannelReaderInputView(currentSpilledBuildSide, memory,
+                               p.getBuildSideBlockCount(), 
p.getLastSegmentLimit(), false);
+                       final ChannelReaderInputViewIterator<BT> inIter = new 
ChannelReaderInputViewIterator<BT>(inView,
+                               this.availableMemory, this.buildSideSerializer);
+
+                       this.unmatchedBuildIterator = inIter;
 
-                       // unregister the pending partition
                        this.partitionsPending.remove(0);
-                       this.currentRecursionDepth = p.getRecursionLevel() + 1;
-                       
-                       // recursively get the next
-                       return nextRecord();
+
+                       return true;
+               }
+
+               this.probeMatchedPhase = true;
+               this.unmatchedBuildVisited = false;
+
+               // build the next table; memory must be allocated after this 
call
+               buildTableFromSpilledPartition(p);
+
+               // set the probe side - gather memory segments for reading
+               LinkedBlockingQueue<MemorySegment> returnQueue = new 
LinkedBlockingQueue<MemorySegment>();
+               this.currentSpilledProbeSide = 
this.ioManager.createBlockChannelReader(p.getProbeSideChannel().getChannelID(), 
returnQueue);
+
+               List<MemorySegment> memory = new ArrayList<MemorySegment>();
+               MemorySegment seg1 = getNextBuffer();
+               if (seg1 != null) {
+                       memory.add(seg1);
+                       MemorySegment seg2 = getNextBuffer();
+                       if (seg2 != null) {
+                               memory.add(seg2);
+                       }
                }
                else {
-                       // no more data
-                       return false;
+                       throw new IllegalStateException("Attempting to begin 
probing of partition without any memory available");
                }
+
+               ChannelReaderInputViewIterator<PT> probeReader = new 
ChannelReaderInputViewIterator<PT>(this.currentSpilledProbeSide,
+                       returnQueue, memory, this.availableMemory, 
this.probeSideSerializer, p.getProbeSideBlockCount());
+               this.probeIterator.set(probeReader);
+
+               // unregister the pending partition
+               this.partitionsPending.remove(0);
+               this.currentRecursionDepth = p.getRecursionLevel() + 1;
+
+               // recursively get the next
+               return nextRecord();
        }
        
        public boolean nextRecord() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/1c48e346/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
index 84868ff..78fca67 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
@@ -61,7 +61,8 @@ public class ReOpenableHashPartition<BT, PT> extends 
HashPartition<BT, PT> {
 
        @Override
        public int finalizeProbePhase(List<MemorySegment> freeMemory,
-                       List<HashPartition<BT, PT>> spilledPartitions) throws 
IOException {
+                       List<HashPartition<BT, PT>> spilledPartitions,
+                       boolean keepUnprobedSpilledPartitions) throws 
IOException {
                if ( furtherPartitioning || recursionLevel != 0 || isRestored) {
                        if (isInMemory() && initialBuildSideChannel != null && 
!isRestored) {
                                // return the overflow segments
@@ -74,22 +75,22 @@ public class ReOpenableHashPartition<BT, PT> extends 
HashPartition<BT, PT> {
                                // we already returned the partitionBuffers via 
the returnQueue.
                                return 0; 
                        }
-                       return super.finalizeProbePhase(freeMemory, 
spilledPartitions);
+                       return super.finalizeProbePhase(freeMemory, 
spilledPartitions, keepUnprobedSpilledPartitions);
                }
-               if (!isInMemory() && this.probeSideRecordCounter == 0) { 
+               if (isInMemory()) {
+                       return 0;
+               } else if (this.probeSideRecordCounter == 0 && 
!keepUnprobedSpilledPartitions) {
                        
freeMemory.add(this.probeSideBuffer.getCurrentSegment());
                        // delete the spill files
                        this.probeSideChannel.close();
                        this.probeSideChannel.deleteChannel();
                        return 0;
+               } else {
+                       this.probeSideBuffer.close();
+                       this.probeSideChannel.close(); // finish pending write 
requests.
+                       spilledPartitions.add(this);
+                       return 1;
                }
-               if (isInMemory()) {
-                       return 0;
-               }
-               this.probeSideBuffer.close();
-               this.probeSideChannel.close(); // finish pending write requests.
-               spilledPartitions.add(this);
-               return 1;
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/1c48e346/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index ed709c0..d9d6a25 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -677,7 +677,57 @@ public class HashTableITCase {
                
                this.memManager.release(join.getFreedMemory());
        }
-       
+
+       /*
+        * Same test as {@link #testSparseProbeSpilling} but using a build-side 
outer join
+        * that requires spilled build-side records to be returned and counted.
+        */
+       @Test
+       public void testSparseProbeSpillingWithOuterJoin() throws IOException, 
MemoryAllocationException
+       {
+               final int NUM_BUILD_KEYS = 1000000;
+               final int NUM_BUILD_VALS = 1;
+               final int NUM_PROBE_KEYS = 20;
+               final int NUM_PROBE_VALS = 1;
+
+               MutableObjectIterator<Record> buildInput = new 
UniformRecordGenerator(
+                               NUM_BUILD_KEYS, NUM_BUILD_VALS, false);
+
+               // allocate the memory for the HashTable
+               List<MemorySegment> memSegments;
+               try {
+                       memSegments = this.memManager.allocatePages(MEM_OWNER, 
96);
+               }
+               catch (MemoryAllocationException maex) {
+                       fail("Memory for the Join could not be provided.");
+                       return;
+               }
+
+               final MutableHashTable<Record, Record> join = new 
MutableHashTable<Record, Record>(
+                               this.recordBuildSideAccesssor, 
this.recordProbeSideAccesssor,
+                               this.recordBuildSideComparator, 
this.recordProbeSideComparator, this.pactRecordComparator,
+                               memSegments, ioManager);
+               join.open(buildInput, new 
UniformRecordGenerator(NUM_PROBE_KEYS, NUM_PROBE_VALS, true), true);
+
+               int expectedNumResults = (Math.max(NUM_PROBE_KEYS, 
NUM_BUILD_KEYS) * NUM_BUILD_VALS)
+                               * NUM_PROBE_VALS;
+
+               final Record recordReuse = new Record();
+               int numRecordsInJoinResult = 0;
+
+               while (join.nextRecord()) {
+                       MutableObjectIterator<Record> buildSide = 
join.getBuildSideIterator();
+                       while (buildSide.next(recordReuse) != null) {
+                               numRecordsInJoinResult++;
+                       }
+               }
+               Assert.assertEquals("Wrong number of records in join result.", 
expectedNumResults, numRecordsInJoinResult);
+
+               join.close();
+
+               this.memManager.release(join.getFreedMemory());
+       }
+
        /*
         * This test validates a bug fix against former memory loss in the case 
where a partition was spilled
         * during an insert into the same.

Reply via email to