This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8476c8b [SPARK-38542][SQL] UnsafeHashedRelation should serialize numKeys out 8476c8b is described below commit 8476c8b846ffd2622a6bcf1accf9fa55ffbdc0db Author: mcdull-zhang <work4d...@163.com> AuthorDate: Wed Mar 16 14:17:18 2022 +0800 [SPARK-38542][SQL] UnsafeHashedRelation should serialize numKeys out ### What changes were proposed in this pull request? UnsafeHashedRelation should serialize numKeys out ### Why are the changes needed? One case I found was this: We turned on ReusedExchange(BroadcastExchange), but the returned UnsafeHashedRelation is missing numKeys. The reason is that the current type of TorrentBroadcast._value is SoftReference, so the UnsafeHashedRelation obtained by deserialization loses numKeys, which will lead to incorrect calculation results. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a line of assert to an existing unit test Closes #35836 from mcdull-zhang/UnsafeHashed. Authored-by: mcdull-zhang <work4d...@163.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../scala/org/apache/spark/sql/execution/joins/HashedRelation.scala | 4 +++- .../org/apache/spark/sql/execution/joins/HashedRelationSuite.scala | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 698e7ed..253f16e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -207,7 +207,7 @@ private[execution] class ValueRowWithKeyIndex { * A HashedRelation for UnsafeRow, which is backed BytesToBytesMap. * * It's serialized in the following format: - * [number of keys] + * [number of keys] [number of fields] * [size of key] [size of value] [key bytes] [bytes for value] */ private[joins] class UnsafeHashedRelation( @@ -364,6 +364,7 @@ private[joins] class UnsafeHashedRelation( writeInt: (Int) => Unit, writeLong: (Long) => Unit, writeBuffer: (Array[Byte], Int, Int) => Unit) : Unit = { + writeInt(numKeys) writeInt(numFields) // TODO: move these into BytesToBytesMap writeLong(binaryMap.numKeys()) @@ -397,6 +398,7 @@ private[joins] class UnsafeHashedRelation( readInt: () => Int, readLong: () => Long, readBuffer: (Array[Byte], Int, Int) => Unit): Unit = { + numKeys = readInt() numFields = readInt() resultRow = new UnsafeRow(numFields) val nKeys = readLong() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 2462fe3..6c87178 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -93,6 +93,9 @@ class HashedRelationSuite extends SharedSparkSession { assert(hashed2.get(toUnsafe(InternalRow(10))) === null) assert(hashed2.get(unsafeData(2)).toArray === data2) + // SPARK-38542: UnsafeHashedRelation should serialize numKeys out + assert(hashed2.keys().map(_.copy()).forall(_.numFields == 1)) + val os2 = new ByteArrayOutputStream() val out2 = new ObjectOutputStream(os2) hashed2.writeExternal(out2) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org