Repository: spark Updated Branches: refs/heads/master 227821905 -> a8ab2634c
[SPARK-9832] [SQL] add a thread-safe lookup for BytesToBytseMap This patch add a thread-safe lookup for BytesToBytseMap, and use that in broadcasted HashedRelation. Author: Davies Liu <dav...@databricks.com> Closes #8151 from davies/safeLookup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8ab2634 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8ab2634 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8ab2634 Branch: refs/heads/master Commit: a8ab2634c1eee143a4deaf309204df8add727f9e Parents: 2278219 Author: Davies Liu <dav...@databricks.com> Authored: Wed Aug 12 21:26:00 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Aug 12 21:26:00 2015 -0700 ---------------------------------------------------------------------- .../spark/unsafe/map/BytesToBytesMap.java | 30 ++++++++++++++------ .../sql/execution/joins/HashedRelation.scala | 6 ++-- 2 files changed, 26 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a8ab2634/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 87ed47e..5f3a4fc 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -17,25 +17,24 @@ package org.apache.spark.unsafe.map; -import java.lang.Override; -import java.lang.UnsupportedOperationException; +import javax.annotation.Nullable; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import javax.annotation.Nullable; - import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.shuffle.ShuffleMemoryManager; -import org.apache.spark.unsafe.*; +import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.bitset.BitSet; import org.apache.spark.unsafe.hash.Murmur3_x86_32; -import org.apache.spark.unsafe.memory.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.memory.MemoryLocation; +import org.apache.spark.unsafe.memory.TaskMemoryManager; /** * An append-only hash map where keys and values are contiguous regions of bytes. @@ -328,6 +327,20 @@ public final class BytesToBytesMap { Object keyBaseObject, long keyBaseOffset, int keyRowLengthBytes) { + safeLookup(keyBaseObject, keyBaseOffset, keyRowLengthBytes, loc); + return loc; + } + + /** + * Looks up a key, and saves the result in provided `loc`. + * + * This is a thread-safe version of `lookup`, could be used by multiple threads. + */ + public void safeLookup( + Object keyBaseObject, + long keyBaseOffset, + int keyRowLengthBytes, + Location loc) { assert(bitset != null); assert(longArray != null); @@ -343,7 +356,8 @@ public final class BytesToBytesMap { } if (!bitset.isSet(pos)) { // This is a new key. - return loc.with(pos, hashcode, false); + loc.with(pos, hashcode, false); + return; } else { long stored = longArray.get(pos * 2 + 1); if ((int) (stored) == hashcode) { @@ -361,7 +375,7 @@ public final class BytesToBytesMap { keyRowLengthBytes ); if (areEqual) { - return loc; + return; } else { if (enablePerfMetrics) { numHashCollisions++; http://git-wip-us.apache.org/repos/asf/spark/blob/a8ab2634/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index bb333b4..ea02076 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -215,8 +215,10 @@ private[joins] final class UnsafeHashedRelation( if (binaryMap != null) { // Used in Broadcast join - val loc = binaryMap.lookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset, - unsafeKey.getSizeInBytes) + val map = binaryMap // avoid the compiler error + val loc = new map.Location // this could be allocated in stack + binaryMap.safeLookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset, + unsafeKey.getSizeInBytes, loc) if (loc.isDefined) { val buffer = CompactBuffer[UnsafeRow]() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org