Repository: spark
Updated Branches:
  refs/heads/branch-1.5 990b4bf7c -> 3137628bc


[SPARK-9548][SQL] Add a destructive iterator for BytesToBytesMap

This pull request adds a destructive iterator to BytesToBytesMap. When used, 
the iterator frees pages as it traverses them. This is part of the effort to 
avoid starving when we have more than one operators that can exhaust memory.

This is based on #7924, but fixes a bug there (Don't use destructive iterator 
in UnsafeKVExternalSorter).

Closes #7924.

Author: Liang-Chi Hsieh <vii...@appier.com>
Author: Reynold Xin <r...@databricks.com>

Closes #8003 from rxin/map-destructive-iterator and squashes the following 
commits:

6b618c3 [Reynold Xin] Don't use destructive iterator in UnsafeKVExternalSorter.
a7bd8ec [Reynold Xin] Merge remote-tracking branch 'viirya/destructive_iter' 
into map-destructive-iterator
7652083 [Liang-Chi Hsieh] For comments: add destructiveIterator(), modify unit 
test, remove code block.
4a3e9de [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
destructive_iter
581e9e3 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
destructive_iter
f0ff783 [Liang-Chi Hsieh] No need to free last page.
9e9d2a3 [Liang-Chi Hsieh] Add a destructive iterator for BytesToBytesMap.

(cherry picked from commit 21fdfd7d6f89adbd37066c169e6ba9ccd337683e)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3137628b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3137628b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3137628b

Branch: refs/heads/branch-1.5
Commit: 3137628bcd21c5e07b738a54c4ed5664e4a12433
Parents: 990b4bf
Author: Liang-Chi Hsieh <vii...@appier.com>
Authored: Thu Aug 6 14:33:29 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Aug 6 14:33:56 2015 -0700

----------------------------------------------------------------------
 .../spark/unsafe/map/BytesToBytesMap.java       | 33 +++++++++++++++--
 .../map/AbstractBytesToBytesMapSuite.java       | 37 +++++++++++++++++---
 .../UnsafeFixedWidthAggregationMap.java         |  7 ++--
 .../sql/execution/UnsafeKVExternalSorter.java   |  5 ++-
 4 files changed, 71 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3137628b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 2034743..5ac3736 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -227,22 +227,35 @@ public final class BytesToBytesMap {
     private final Iterator<MemoryBlock> dataPagesIterator;
     private final Location loc;
 
-    private MemoryBlock currentPage;
+    private MemoryBlock currentPage = null;
     private int currentRecordNumber = 0;
     private Object pageBaseObject;
     private long offsetInPage;
 
+    // If this iterator destructive or not. When it is true, it frees each 
page as it moves onto
+    // next one.
+    private boolean destructive = false;
+    private BytesToBytesMap bmap;
+
     private BytesToBytesMapIterator(
-        int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc) 
{
+        int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc,
+        boolean destructive, BytesToBytesMap bmap) {
       this.numRecords = numRecords;
       this.dataPagesIterator = dataPagesIterator;
       this.loc = loc;
+      this.destructive = destructive;
+      this.bmap = bmap;
       if (dataPagesIterator.hasNext()) {
         advanceToNextPage();
       }
     }
 
     private void advanceToNextPage() {
+      if (destructive && currentPage != null) {
+        dataPagesIterator.remove();
+        this.bmap.taskMemoryManager.freePage(currentPage);
+        this.bmap.shuffleMemoryManager.release(currentPage.size());
+      }
       currentPage = dataPagesIterator.next();
       pageBaseObject = currentPage.getBaseObject();
       offsetInPage = currentPage.getBaseOffset();
@@ -281,7 +294,21 @@ public final class BytesToBytesMap {
    * `lookup()`, the behavior of the returned iterator is undefined.
    */
   public BytesToBytesMapIterator iterator() {
-    return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc);
+    return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc, 
false, this);
+  }
+
+  /**
+   * Returns a destructive iterator for iterating over the entries of this 
map. It frees each page
+   * as it moves onto next one. Notice: it is illegal to call any method on 
the map after
+   * `destructiveIterator()` has been called.
+   *
+   * For efficiency, all calls to `next()` will return the same {@link 
Location} object.
+   *
+   * If any other lookups or operations are performed on this map while 
iterating over it, including
+   * `lookup()`, the behavior of the returned iterator is undefined.
+   */
+  public BytesToBytesMapIterator destructiveIterator() {
+    return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc, 
true, this);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3137628b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 0e23a64..3c50033 100644
--- 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -183,8 +183,7 @@ public abstract class AbstractBytesToBytesMapSuite {
     }
   }
 
-  @Test
-  public void iteratorTest() throws Exception {
+  private void iteratorTestBase(boolean destructive) throws Exception {
     final int size = 4096;
     BytesToBytesMap map = new BytesToBytesMap(
       taskMemoryManager, shuffleMemoryManager, size / 2, PAGE_SIZE_BYTES);
@@ -216,7 +215,14 @@ public abstract class AbstractBytesToBytesMapSuite {
         }
       }
       final java.util.BitSet valuesSeen = new java.util.BitSet(size);
-      final Iterator<BytesToBytesMap.Location> iter = map.iterator();
+      final Iterator<BytesToBytesMap.Location> iter;
+      if (destructive) {
+        iter = map.destructiveIterator();
+      } else {
+        iter = map.iterator();
+      }
+      int numPages = map.getNumDataPages();
+      int countFreedPages = 0;
       while (iter.hasNext()) {
         final BytesToBytesMap.Location loc = iter.next();
         Assert.assertTrue(loc.isDefined());
@@ -228,11 +234,22 @@ public abstract class AbstractBytesToBytesMapSuite {
         if (keyLength == 0) {
           Assert.assertTrue("value " + value + " was not divisible by 5", 
value % 5 == 0);
         } else {
-        final long key = PlatformDependent.UNSAFE.getLong(
-          keyAddress.getBaseObject(), keyAddress.getBaseOffset());
+          final long key = PlatformDependent.UNSAFE.getLong(
+            keyAddress.getBaseObject(), keyAddress.getBaseOffset());
           Assert.assertEquals(value, key);
         }
         valuesSeen.set((int) value);
+        if (destructive) {
+          // The iterator moves onto next page and frees previous page
+          if (map.getNumDataPages() < numPages) {
+            numPages = map.getNumDataPages();
+            countFreedPages++;
+          }
+        }
+      }
+      if (destructive) {
+        // Latest page is not freed by iterator but by map itself
+        Assert.assertEquals(countFreedPages, numPages - 1);
       }
       Assert.assertEquals(size, valuesSeen.cardinality());
     } finally {
@@ -241,6 +258,16 @@ public abstract class AbstractBytesToBytesMapSuite {
   }
 
   @Test
+  public void iteratorTest() throws Exception {
+    iteratorTestBase(false);
+  }
+
+  @Test
+  public void destructiveIteratorTest() throws Exception {
+    iteratorTestBase(true);
+  }
+
+  @Test
   public void iteratingOverDataPagesWithWastedSpace() throws Exception {
     final int NUM_ENTRIES = 1000 * 1000;
     final int KEY_LENGTH = 24;

http://git-wip-us.apache.org/repos/asf/spark/blob/3137628b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 0245803..efb3353 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -154,14 +154,17 @@ public final class UnsafeFixedWidthAggregationMap {
   }
 
   /**
-   * Returns an iterator over the keys and values in this map.
+   * Returns an iterator over the keys and values in this map. This uses 
destructive iterator of
+   * BytesToBytesMap. So it is illegal to call any other method on this map 
after `iterator()` has
+   * been called.
    *
    * For efficiency, each call returns the same object.
    */
   public KVIterator<UnsafeRow, UnsafeRow> iterator() {
     return new KVIterator<UnsafeRow, UnsafeRow>() {
 
-      private final BytesToBytesMap.BytesToBytesMapIterator 
mapLocationIterator = map.iterator();
+      private final BytesToBytesMap.BytesToBytesMapIterator 
mapLocationIterator =
+        map.destructiveIterator();
       private final UnsafeRow key = new UnsafeRow();
       private final UnsafeRow value = new UnsafeRow();
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3137628b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 6c1cf13..9a65c9d 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -88,8 +88,11 @@ public final class UnsafeKVExternalSorter {
       final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
         taskMemoryManager, recordComparator, prefixComparator, Math.max(1, 
map.numElements()));
 
-      final int numKeyFields = keySchema.size();
+      // We cannot use the destructive iterator here because we are reusing 
the existing memory
+      // pages in BytesToBytesMap to hold records during sorting.
+      // The only new memory we are allocating is the pointer/prefix array.
       BytesToBytesMap.BytesToBytesMapIterator iter = map.iterator();
+      final int numKeyFields = keySchema.size();
       UnsafeRow row = new UnsafeRow();
       while (iter.hasNext()) {
         final BytesToBytesMap.Location loc = iter.next();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to