Repository: spark
Updated Branches:
  refs/heads/branch-1.5 3b1b8ea3e -> 8229437c3


[SPARK-9832] [SQL] add a thread-safe lookup for BytesToBytseMap

This patch add a thread-safe lookup for BytesToBytseMap, and use that in 
broadcasted HashedRelation.

Author: Davies Liu <dav...@databricks.com>

Closes #8151 from davies/safeLookup.

(cherry picked from commit a8ab2634c1eee143a4deaf309204df8add727f9e)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8229437c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8229437c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8229437c

Branch: refs/heads/branch-1.5
Commit: 8229437c31db3a059b48ba26633d9f038cac74b8
Parents: 3b1b8ea
Author: Davies Liu <dav...@databricks.com>
Authored: Wed Aug 12 21:26:00 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Aug 12 21:26:08 2015 -0700

----------------------------------------------------------------------
 .../spark/unsafe/map/BytesToBytesMap.java       | 30 ++++++++++++++------
 .../sql/execution/joins/HashedRelation.scala    |  6 ++--
 2 files changed, 26 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8229437c/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 87ed47e..5f3a4fc 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -17,25 +17,24 @@
 
 package org.apache.spark.unsafe.map;
 
-import java.lang.Override;
-import java.lang.UnsupportedOperationException;
+import javax.annotation.Nullable;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
-import javax.annotation.Nullable;
-
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.spark.shuffle.ShuffleMemoryManager;
-import org.apache.spark.unsafe.*;
+import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.bitset.BitSet;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
-import org.apache.spark.unsafe.memory.*;
+import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.memory.MemoryLocation;
+import org.apache.spark.unsafe.memory.TaskMemoryManager;
 
 /**
  * An append-only hash map where keys and values are contiguous regions of 
bytes.
@@ -328,6 +327,20 @@ public final class BytesToBytesMap {
       Object keyBaseObject,
       long keyBaseOffset,
       int keyRowLengthBytes) {
+    safeLookup(keyBaseObject, keyBaseOffset, keyRowLengthBytes, loc);
+    return loc;
+  }
+
+  /**
+   * Looks up a key, and saves the result in provided `loc`.
+   *
+   * This is a thread-safe version of `lookup`, could be used by multiple 
threads.
+   */
+  public void safeLookup(
+      Object keyBaseObject,
+      long keyBaseOffset,
+      int keyRowLengthBytes,
+      Location loc) {
     assert(bitset != null);
     assert(longArray != null);
 
@@ -343,7 +356,8 @@ public final class BytesToBytesMap {
       }
       if (!bitset.isSet(pos)) {
         // This is a new key.
-        return loc.with(pos, hashcode, false);
+        loc.with(pos, hashcode, false);
+        return;
       } else {
         long stored = longArray.get(pos * 2 + 1);
         if ((int) (stored) == hashcode) {
@@ -361,7 +375,7 @@ public final class BytesToBytesMap {
               keyRowLengthBytes
             );
             if (areEqual) {
-              return loc;
+              return;
             } else {
               if (enablePerfMetrics) {
                 numHashCollisions++;

http://git-wip-us.apache.org/repos/asf/spark/blob/8229437c/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index bb333b4..ea02076 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -215,8 +215,10 @@ private[joins] final class UnsafeHashedRelation(
 
     if (binaryMap != null) {
       // Used in Broadcast join
-      val loc = binaryMap.lookup(unsafeKey.getBaseObject, 
unsafeKey.getBaseOffset,
-        unsafeKey.getSizeInBytes)
+      val map = binaryMap  // avoid the compiler error
+      val loc = new map.Location  // this could be allocated in stack
+      binaryMap.safeLookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset,
+        unsafeKey.getSizeInBytes, loc)
       if (loc.isDefined) {
         val buffer = CompactBuffer[UnsafeRow]()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to