Repository: spark
Updated Branches:
  refs/heads/master 8fe5d2c39 -> 2c54aae1b


[SPARK-24809][SQL] Serializing LongToUnsafeRowMap in executor may result in 
data error

When join key is long or int in broadcast join, Spark will use 
`LongToUnsafeRowMap` to store key-values of the table witch will be 
broadcasted. But, when `LongToUnsafeRowMap` is broadcasted to executors, and it 
is too big to hold in memory, it will be stored in disk. At that time, because 
`write` uses a variable `cursor` to determine how many bytes in `page` of 
`LongToUnsafeRowMap` will be write out and the `cursor` was not restore when 
deserializing, executor will write out nothing from page into disk.

## What changes were proposed in this pull request?
Restore cursor value when deserializing.

Author: liulijia <liutang...@yeah.net>

Closes #21772 from liutang123/SPARK-24809.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c54aae1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c54aae1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c54aae1

Branch: refs/heads/master
Commit: 2c54aae1bc2fa3da26917c89e6201fb2108d9fab
Parents: 8fe5d2c
Author: liulijia <liutang...@yeah.net>
Authored: Sun Jul 29 13:13:00 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Sun Jul 29 13:13:00 2018 -0700

----------------------------------------------------------------------
 .../sql/execution/joins/HashedRelation.scala    |  2 ++
 .../execution/joins/HashedRelationSuite.scala   | 29 ++++++++++++++++++++
 2 files changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c54aae1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 20ce01f..86eb47a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -772,6 +772,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
     array = readLongArray(readBuffer, length)
     val pageLength = readLong().toInt
     page = readLongArray(readBuffer, pageLength)
+    // Restore cursor variable to make this map able to be serialized again on 
executors.
+    cursor = pageLength * 8 + Platform.LONG_ARRAY_OFFSET
   }
 
   override def readExternal(in: ObjectInput): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2c54aae1/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 037cc2e..d9b34dc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -278,6 +278,35 @@ class HashedRelationSuite extends SparkFunSuite with 
SharedSQLContext {
     map.free()
   }
 
+  test("SPARK-24809: Serializing LongToUnsafeRowMap in executor may result in 
data error") {
+    val unsafeProj = UnsafeProjection.create(Array[DataType](LongType))
+    val originalMap = new LongToUnsafeRowMap(mm, 1)
+
+    val key1 = 1L
+    val value1 = 4852306286022334418L
+
+    val key2 = 2L
+    val value2 = 8813607448788216010L
+
+    originalMap.append(key1, unsafeProj(InternalRow(value1)))
+    originalMap.append(key2, unsafeProj(InternalRow(value2)))
+    originalMap.optimize()
+
+    val ser = sparkContext.env.serializer.newInstance()
+    // Simulate serialize/deserialize twice on driver and executor
+    val firstTimeSerialized = 
ser.deserialize[LongToUnsafeRowMap](ser.serialize(originalMap))
+    val secondTimeSerialized =
+      ser.deserialize[LongToUnsafeRowMap](ser.serialize(firstTimeSerialized))
+
+    val resultRow = new UnsafeRow(1)
+    assert(secondTimeSerialized.getValue(key1, resultRow).getLong(0) === 
value1)
+    assert(secondTimeSerialized.getValue(key2, resultRow).getLong(0) === 
value2)
+
+    originalMap.free()
+    firstTimeSerialized.free()
+    secondTimeSerialized.free()
+  }
+
   test("Spark-14521") {
     val ser = new KryoSerializer(
       (new SparkConf).set("spark.kryo.referenceTracking", 
"false")).newInstance()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to