Shiva Jahangiri has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/3412


Change subject: [ASTERIXDB-2577]One frame per spilled partitions
......................................................................

[ASTERIXDB-2577]One frame per spilled partitions

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
During the probe phase there should be one frame per each
spilled partition otherwise the whole memory may be used
to store in memory partitions. In this case, every record
that is matching with a spilled partition from build phase
will be flushed directly to the disk. This change will fix
this issue by making sure that when we spill a partition or
read a partition back during making space for hash table, we
consider that 1 frame for the corresponding partition.

Change-Id: I84ca0ea9d894ad4be0798d725ea5acdbcef0048c
---
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
1 file changed, 13 insertions(+), 4 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/12/3412/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index c78e0dc..ff61368 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -310,7 +310,8 @@
                 long hashTableSizeDecrease =
                         
-SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
                                 -buildPSizeInTups[pidToSpill], frameSize);
-                freeSpace = freeSpace + 
bufferManager.getPhysicalSize(pidToSpill) + hashTableSizeDecrease;
+                // -1 because we need to keep one frame for each spilled 
partition.
+                freeSpace = freeSpace + 
bufferManager.getPhysicalSize(pidToSpill) + hashTableSizeDecrease - 1;
                 inMemTupCount -= buildPSizeInTups[pidToSpill];
                 spillPartition(pidToSpill);
                 closeBuildPartition(pidToSpill);
@@ -334,7 +335,8 @@
                     long expectedHashTableSizeDecrease =
                             
-SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
                                     -numberOfTuplesToBeSpilled, frameSize);
-                    freeSpace = freeSpace + spaceToBeReturned + 
expectedHashTableSizeDecrease;
+                    // -1 because we need to keep one frame for each spilled 
partition.
+                    freeSpace = freeSpace + spaceToBeReturned + 
expectedHashTableSizeDecrease - 1;
                     // Adjusts the hash table size
                     inMemTupCount -= numberOfTuplesToBeSpilled;
                     if (freeSpace >= 0) {
@@ -358,10 +360,15 @@
             }
             long expectedHashTableByteSizeIncrease = SerializableHashTable
                     .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, 
buildPSizeInTups[pid], frameSize);
-            freeSpace = freeSpace - bufferManager.getPhysicalSize(pid) - 
expectedHashTableByteSizeIncrease;
+            // +1 because we need to keep one frame for each spilled 
partition, but when that partition is read back in
+            // we can release its one frame.
+            freeSpace = freeSpace - bufferManager.getPhysicalSize(pid) - 
expectedHashTableByteSizeIncrease + 1;
             inMemTupCount += buildPSizeInTups[pid];
             // Adjusts the hash table size
             hashTableByteSizeForInMemTuples += 
expectedHashTableByteSizeIncrease;
+        }
+        if (memSizeInFrames * ctx.getInitialFrameSize() - freeSpace < 
spilledStatus.cardinality()) {
+            throw new HyracksDataException("After build finishes, there should 
be at least one frame for each spilled partition.");
         }

         return inMemTupCount;
@@ -383,8 +390,10 @@
                 continue;
             }
             // We put minus since the method returns a negative value to 
represent a newly reclaimed space.
+            // -1 because we need to keep one frame for each spilled partition.
             spaceAfterSpill = currentFreeSpace + 
bufferManager.getPhysicalSize(p) + (-SerializableHashTable
-                    
.calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount, 
-buildPSizeInTups[p], frameSize));
+                    
.calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount, 
-buildPSizeInTups[p], frameSize))
+                    - 1;
             if (spaceAfterSpill == 0) {
                 // Found the perfect one. Just returns this partition.
                 return p;

--
To view, visit https://asterix-gerrit.ics.uci.edu/3412
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I84ca0ea9d894ad4be0798d725ea5acdbcef0048c
Gerrit-Change-Number: 3412
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <[email protected]>

Reply via email to