spark git commit: [SPARK-16922] [SPARK-17211] [SQL] make the address of values portable in LongToUnsafeRowMap
Repository: spark Updated Branches: refs/heads/branch-2.0 c0f1f536d -> 95e44dca1 [SPARK-16922] [SPARK-17211] [SQL] make the address of values portable in LongToUnsafeRowMap ## What changes were proposed in this pull request? In LongToUnsafeRowMap, we use offset of a value as pointer, stored in a array also in the page for chained values. The offset is not portable, because Platform.LONG_ARRAY_OFFSET will be different with different JVM Heap size, then the deserialized LongToUnsafeRowMap will be corrupt. This PR will change to use portable address (without Platform.LONG_ARRAY_OFFSET). ## How was this patch tested? Added a test case with random generated keys, to improve the coverage. But this test is not a regression test, that could require a Spark cluster that have at least 32G heap in driver or executor. Author: Davies Liu Closes #14927 from davies/longmap. (cherry picked from commit f7e26d788757f917b32749856bb29feb7b4c2987) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95e44dca Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95e44dca Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95e44dca Branch: refs/heads/branch-2.0 Commit: 95e44dca1d99ff7904c3c2e174f0f2123062ce3c Parents: c0f1f53 Author: Davies Liu Authored: Tue Sep 6 10:46:31 2016 -0700 Committer: Davies Liu Committed: Tue Sep 6 10:46:52 2016 -0700 -- .../sql/execution/joins/HashedRelation.scala| 27 +++--- .../execution/joins/HashedRelationSuite.scala | 56 2 files changed, 75 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/95e44dca/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 0897573..8821c0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -447,10 +447,20 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap */ private def nextSlot(pos: Int): Int = (pos + 2) & mask + private[this] def toAddress(offset: Long, size: Int): Long = { +((offset - Platform.LONG_ARRAY_OFFSET) << SIZE_BITS) | size + } + + private[this] def toOffset(address: Long): Long = { +(address >>> SIZE_BITS) + Platform.LONG_ARRAY_OFFSET + } + + private[this] def toSize(address: Long): Int = { +(address & SIZE_MASK).toInt + } + private def getRow(address: Long, resultRow: UnsafeRow): UnsafeRow = { -val offset = address >>> SIZE_BITS -val size = address & SIZE_MASK -resultRow.pointTo(page, offset, size.toInt) +resultRow.pointTo(page, toOffset(address), toSize(address)) resultRow } @@ -485,9 +495,9 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap var addr = address override def hasNext: Boolean = addr != 0 override def next(): UnsafeRow = { -val offset = addr >>> SIZE_BITS -val size = addr & SIZE_MASK -resultRow.pointTo(page, offset, size.toInt) +val offset = toOffset(addr) +val size = toSize(addr) +resultRow.pointTo(page, offset, size) addr = Platform.getLong(page, offset + size) resultRow } @@ -554,7 +564,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap Platform.putLong(page, cursor, 0) cursor += 8 numValues += 1 -updateIndex(key, (offset.toLong << SIZE_BITS) | row.getSizeInBytes) +updateIndex(key, toAddress(offset, row.getSizeInBytes)) } /** @@ -562,6 +572,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap */ private def updateIndex(key: Long, address: Long): Unit = { var pos = firstSlot(key) +assert(numKeys < array.length / 2) while (array(pos) != key && array(pos + 1) != 0) { pos = nextSlot(pos) } @@ -582,7 +593,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } } else { // there are some values for this key, put the address in the front of them. - val pointer = (address >>> SIZE_BITS) + (address & SIZE_MASK) + val pointer = toOffset(address) + toSize(address) Platform.putLong(page, pointer, array(pos + 1)) array(pos + 1) = address } http://git-wip-us.apache.org/repos/asf/spark/blob/95e44dca/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala -
spark git commit: [SPARK-16922] [SPARK-17211] [SQL] make the address of values portable in LongToUnsafeRowMap
Repository: spark Updated Branches: refs/heads/master bc2767df2 -> f7e26d788 [SPARK-16922] [SPARK-17211] [SQL] make the address of values portable in LongToUnsafeRowMap ## What changes were proposed in this pull request? In LongToUnsafeRowMap, we use offset of a value as pointer, stored in a array also in the page for chained values. The offset is not portable, because Platform.LONG_ARRAY_OFFSET will be different with different JVM Heap size, then the deserialized LongToUnsafeRowMap will be corrupt. This PR will change to use portable address (without Platform.LONG_ARRAY_OFFSET). ## How was this patch tested? Added a test case with random generated keys, to improve the coverage. But this test is not a regression test, that could require a Spark cluster that have at least 32G heap in driver or executor. Author: Davies Liu Closes #14927 from davies/longmap. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7e26d78 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7e26d78 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7e26d78 Branch: refs/heads/master Commit: f7e26d788757f917b32749856bb29feb7b4c2987 Parents: bc2767d Author: Davies Liu Authored: Tue Sep 6 10:46:31 2016 -0700 Committer: Davies Liu Committed: Tue Sep 6 10:46:31 2016 -0700 -- .../sql/execution/joins/HashedRelation.scala| 27 +++--- .../execution/joins/HashedRelationSuite.scala | 56 2 files changed, 75 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f7e26d78/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 0897573..8821c0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -447,10 +447,20 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap */ private def nextSlot(pos: Int): Int = (pos + 2) & mask + private[this] def toAddress(offset: Long, size: Int): Long = { +((offset - Platform.LONG_ARRAY_OFFSET) << SIZE_BITS) | size + } + + private[this] def toOffset(address: Long): Long = { +(address >>> SIZE_BITS) + Platform.LONG_ARRAY_OFFSET + } + + private[this] def toSize(address: Long): Int = { +(address & SIZE_MASK).toInt + } + private def getRow(address: Long, resultRow: UnsafeRow): UnsafeRow = { -val offset = address >>> SIZE_BITS -val size = address & SIZE_MASK -resultRow.pointTo(page, offset, size.toInt) +resultRow.pointTo(page, toOffset(address), toSize(address)) resultRow } @@ -485,9 +495,9 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap var addr = address override def hasNext: Boolean = addr != 0 override def next(): UnsafeRow = { -val offset = addr >>> SIZE_BITS -val size = addr & SIZE_MASK -resultRow.pointTo(page, offset, size.toInt) +val offset = toOffset(addr) +val size = toSize(addr) +resultRow.pointTo(page, offset, size) addr = Platform.getLong(page, offset + size) resultRow } @@ -554,7 +564,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap Platform.putLong(page, cursor, 0) cursor += 8 numValues += 1 -updateIndex(key, (offset.toLong << SIZE_BITS) | row.getSizeInBytes) +updateIndex(key, toAddress(offset, row.getSizeInBytes)) } /** @@ -562,6 +572,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap */ private def updateIndex(key: Long, address: Long): Unit = { var pos = firstSlot(key) +assert(numKeys < array.length / 2) while (array(pos) != key && array(pos + 1) != 0) { pos = nextSlot(pos) } @@ -582,7 +593,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } } else { // there are some values for this key, put the address in the front of them. - val pointer = (address >>> SIZE_BITS) + (address & SIZE_MASK) + val pointer = toOffset(address) + toSize(address) Platform.putLong(page, pointer, array(pos + 1)) array(pos + 1) = address } http://git-wip-us.apache.org/repos/asf/spark/blob/f7e26d78/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala -- diff --git a/sql/core/src/test/sca