Repository: spark Updated Branches: refs/heads/master 8fe5d2c39 -> 2c54aae1b
[SPARK-24809][SQL] Serializing LongToUnsafeRowMap in executor may result in data error When join key is long or int in broadcast join, Spark will use `LongToUnsafeRowMap` to store key-values of the table witch will be broadcasted. But, when `LongToUnsafeRowMap` is broadcasted to executors, and it is too big to hold in memory, it will be stored in disk. At that time, because `write` uses a variable `cursor` to determine how many bytes in `page` of `LongToUnsafeRowMap` will be write out and the `cursor` was not restore when deserializing, executor will write out nothing from page into disk. ## What changes were proposed in this pull request? Restore cursor value when deserializing. Author: liulijia <liutang...@yeah.net> Closes #21772 from liutang123/SPARK-24809. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c54aae1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c54aae1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c54aae1 Branch: refs/heads/master Commit: 2c54aae1bc2fa3da26917c89e6201fb2108d9fab Parents: 8fe5d2c Author: liulijia <liutang...@yeah.net> Authored: Sun Jul 29 13:13:00 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Sun Jul 29 13:13:00 2018 -0700 ---------------------------------------------------------------------- .../sql/execution/joins/HashedRelation.scala | 2 ++ .../execution/joins/HashedRelationSuite.scala | 29 ++++++++++++++++++++ 2 files changed, 31 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2c54aae1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 20ce01f..86eb47a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -772,6 +772,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap array = readLongArray(readBuffer, length) val pageLength = readLong().toInt page = readLongArray(readBuffer, pageLength) + // Restore cursor variable to make this map able to be serialized again on executors. + cursor = pageLength * 8 + Platform.LONG_ARRAY_OFFSET } override def readExternal(in: ObjectInput): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/2c54aae1/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 037cc2e..d9b34dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -278,6 +278,35 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { map.free() } + test("SPARK-24809: Serializing LongToUnsafeRowMap in executor may result in data error") { + val unsafeProj = UnsafeProjection.create(Array[DataType](LongType)) + val originalMap = new LongToUnsafeRowMap(mm, 1) + + val key1 = 1L + val value1 = 4852306286022334418L + + val key2 = 2L + val value2 = 8813607448788216010L + + originalMap.append(key1, unsafeProj(InternalRow(value1))) + originalMap.append(key2, unsafeProj(InternalRow(value2))) + originalMap.optimize() + + val ser = sparkContext.env.serializer.newInstance() + // Simulate serialize/deserialize twice on driver and executor + val firstTimeSerialized = ser.deserialize[LongToUnsafeRowMap](ser.serialize(originalMap)) + val secondTimeSerialized = + ser.deserialize[LongToUnsafeRowMap](ser.serialize(firstTimeSerialized)) + + val resultRow = new UnsafeRow(1) + assert(secondTimeSerialized.getValue(key1, resultRow).getLong(0) === value1) + assert(secondTimeSerialized.getValue(key2, resultRow).getLong(0) === value2) + + originalMap.free() + firstTimeSerialized.free() + secondTimeSerialized.free() + } + test("Spark-14521") { val ser = new KryoSerializer( (new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org