spark git commit: [SPARK-16922] [SPARK-17211] [SQL] make the address of values portable in LongToUnsafeRowMap

2016-09-06 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 c0f1f536d -> 95e44dca1


[SPARK-16922] [SPARK-17211] [SQL] make the address of values portable in 
LongToUnsafeRowMap

## What changes were proposed in this pull request?

In LongToUnsafeRowMap, we use offset of a value as pointer, stored in a array 
also in the page for chained values. The offset is not portable, because 
Platform.LONG_ARRAY_OFFSET will be different with different JVM Heap size, then 
the deserialized LongToUnsafeRowMap will be corrupt.

This PR will change to use portable address (without 
Platform.LONG_ARRAY_OFFSET).

## How was this patch tested?

Added a test case with random generated keys, to improve the coverage. But this 
test is not a regression test, that could require a Spark cluster that have at 
least 32G heap in driver or executor.

Author: Davies Liu 

Closes #14927 from davies/longmap.

(cherry picked from commit f7e26d788757f917b32749856bb29feb7b4c2987)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95e44dca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95e44dca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95e44dca

Branch: refs/heads/branch-2.0
Commit: 95e44dca1d99ff7904c3c2e174f0f2123062ce3c
Parents: c0f1f53
Author: Davies Liu 
Authored: Tue Sep 6 10:46:31 2016 -0700
Committer: Davies Liu 
Committed: Tue Sep 6 10:46:52 2016 -0700

--
 .../sql/execution/joins/HashedRelation.scala| 27 +++---
 .../execution/joins/HashedRelationSuite.scala   | 56 
 2 files changed, 75 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/95e44dca/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 0897573..8821c0d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -447,10 +447,20 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
*/
   private def nextSlot(pos: Int): Int = (pos + 2) & mask
 
+  private[this] def toAddress(offset: Long, size: Int): Long = {
+((offset - Platform.LONG_ARRAY_OFFSET) << SIZE_BITS) | size
+  }
+
+  private[this] def toOffset(address: Long): Long = {
+(address >>> SIZE_BITS) + Platform.LONG_ARRAY_OFFSET
+  }
+
+  private[this] def toSize(address: Long): Int = {
+(address & SIZE_MASK).toInt
+  }
+
   private def getRow(address: Long, resultRow: UnsafeRow): UnsafeRow = {
-val offset = address >>> SIZE_BITS
-val size = address & SIZE_MASK
-resultRow.pointTo(page, offset, size.toInt)
+resultRow.pointTo(page, toOffset(address), toSize(address))
 resultRow
   }
 
@@ -485,9 +495,9 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
   var addr = address
   override def hasNext: Boolean = addr != 0
   override def next(): UnsafeRow = {
-val offset = addr >>> SIZE_BITS
-val size = addr & SIZE_MASK
-resultRow.pointTo(page, offset, size.toInt)
+val offset = toOffset(addr)
+val size = toSize(addr)
+resultRow.pointTo(page, offset, size)
 addr = Platform.getLong(page, offset + size)
 resultRow
   }
@@ -554,7 +564,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
 Platform.putLong(page, cursor, 0)
 cursor += 8
 numValues += 1
-updateIndex(key, (offset.toLong << SIZE_BITS) | row.getSizeInBytes)
+updateIndex(key, toAddress(offset, row.getSizeInBytes))
   }
 
   /**
@@ -562,6 +572,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
*/
   private def updateIndex(key: Long, address: Long): Unit = {
 var pos = firstSlot(key)
+assert(numKeys < array.length / 2)
 while (array(pos) != key && array(pos + 1) != 0) {
   pos = nextSlot(pos)
 }
@@ -582,7 +593,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
   }
 } else {
   // there are some values for this key, put the address in the front of 
them.
-  val pointer = (address >>> SIZE_BITS) + (address & SIZE_MASK)
+  val pointer = toOffset(address) + toSize(address)
   Platform.putLong(page, pointer, array(pos + 1))
   array(pos + 1) = address
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/95e44dca/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
-

spark git commit: [SPARK-16922] [SPARK-17211] [SQL] make the address of values portable in LongToUnsafeRowMap

2016-09-06 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master bc2767df2 -> f7e26d788


[SPARK-16922] [SPARK-17211] [SQL] make the address of values portable in 
LongToUnsafeRowMap

## What changes were proposed in this pull request?

In LongToUnsafeRowMap, we use offset of a value as pointer, stored in a array 
also in the page for chained values. The offset is not portable, because 
Platform.LONG_ARRAY_OFFSET will be different with different JVM Heap size, then 
the deserialized LongToUnsafeRowMap will be corrupt.

This PR will change to use portable address (without 
Platform.LONG_ARRAY_OFFSET).

## How was this patch tested?

Added a test case with random generated keys, to improve the coverage. But this 
test is not a regression test, that could require a Spark cluster that have at 
least 32G heap in driver or executor.

Author: Davies Liu 

Closes #14927 from davies/longmap.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7e26d78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7e26d78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7e26d78

Branch: refs/heads/master
Commit: f7e26d788757f917b32749856bb29feb7b4c2987
Parents: bc2767d
Author: Davies Liu 
Authored: Tue Sep 6 10:46:31 2016 -0700
Committer: Davies Liu 
Committed: Tue Sep 6 10:46:31 2016 -0700

--
 .../sql/execution/joins/HashedRelation.scala| 27 +++---
 .../execution/joins/HashedRelationSuite.scala   | 56 
 2 files changed, 75 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f7e26d78/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 0897573..8821c0d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -447,10 +447,20 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
*/
   private def nextSlot(pos: Int): Int = (pos + 2) & mask
 
+  private[this] def toAddress(offset: Long, size: Int): Long = {
+((offset - Platform.LONG_ARRAY_OFFSET) << SIZE_BITS) | size
+  }
+
+  private[this] def toOffset(address: Long): Long = {
+(address >>> SIZE_BITS) + Platform.LONG_ARRAY_OFFSET
+  }
+
+  private[this] def toSize(address: Long): Int = {
+(address & SIZE_MASK).toInt
+  }
+
   private def getRow(address: Long, resultRow: UnsafeRow): UnsafeRow = {
-val offset = address >>> SIZE_BITS
-val size = address & SIZE_MASK
-resultRow.pointTo(page, offset, size.toInt)
+resultRow.pointTo(page, toOffset(address), toSize(address))
 resultRow
   }
 
@@ -485,9 +495,9 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
   var addr = address
   override def hasNext: Boolean = addr != 0
   override def next(): UnsafeRow = {
-val offset = addr >>> SIZE_BITS
-val size = addr & SIZE_MASK
-resultRow.pointTo(page, offset, size.toInt)
+val offset = toOffset(addr)
+val size = toSize(addr)
+resultRow.pointTo(page, offset, size)
 addr = Platform.getLong(page, offset + size)
 resultRow
   }
@@ -554,7 +564,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
 Platform.putLong(page, cursor, 0)
 cursor += 8
 numValues += 1
-updateIndex(key, (offset.toLong << SIZE_BITS) | row.getSizeInBytes)
+updateIndex(key, toAddress(offset, row.getSizeInBytes))
   }
 
   /**
@@ -562,6 +572,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
*/
   private def updateIndex(key: Long, address: Long): Unit = {
 var pos = firstSlot(key)
+assert(numKeys < array.length / 2)
 while (array(pos) != key && array(pos + 1) != 0) {
   pos = nextSlot(pos)
 }
@@ -582,7 +593,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
   }
 } else {
   // there are some values for this key, put the address in the front of 
them.
-  val pointer = (address >>> SIZE_BITS) + (address & SIZE_MASK)
+  val pointer = toOffset(address) + toSize(address)
   Platform.putLong(page, pointer, array(pos + 1))
   array(pos + 1) = address
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f7e26d78/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
--
diff --git 
a/sql/core/src/test/sca