This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bd3dc2f5 [SPARK-31511][FOLLOW-UP][TEST][SQL] Make BytesToBytesMap iterators thread-safe bd3dc2f5 is described below commit bd3dc2f54d871d152331612c53f586181f4e87fc Author: sychen <syc...@ctrip.com> AuthorDate: Tue Sep 8 11:54:04 2020 +0000 [SPARK-31511][FOLLOW-UP][TEST][SQL] Make BytesToBytesMap iterators thread-safe ### What changes were proposed in this pull request? Before SPARK-31511 is fixed, `BytesToBytesMap` iterator() is not thread-safe and may cause data inaccuracy. We need to add a unit test. ### Why are the changes needed? Increase test coverage to ensure that iterator() is thread-safe. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? add ut Closes #29669 from cxzl25/SPARK-31511-test. Authored-by: sychen <syc...@ctrip.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/execution/joins/HashedRelationSuite.scala | 39 ++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 72e921d..caa7bdf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -360,6 +360,45 @@ class HashedRelationSuite extends SharedSparkSession { assert(java.util.Arrays.equals(os.toByteArray, os2.toByteArray)) } + test("SPARK-31511: Make BytesToBytesMap iterators thread-safe") { + val ser = sparkContext.env.serializer.newInstance() + val key = Seq(BoundReference(0, LongType, false)) + + val unsafeProj = UnsafeProjection.create( + Seq(BoundReference(0, LongType, false), BoundReference(1, IntegerType, true))) + val rows = (0 until 10000).map(i => unsafeProj(InternalRow(Int.int2long(i), i + 1)).copy()) + val unsafeHashed = UnsafeHashedRelation(rows.iterator, key, 1, mm) + + val os = new ByteArrayOutputStream() + val thread1 = new Thread { + override def run(): Unit = { + val out = new ObjectOutputStream(os) + unsafeHashed.asInstanceOf[UnsafeHashedRelation].writeExternal(out) + out.flush() + } + } + + val thread2 = new Thread { + override def run(): Unit = { + val threadOut = new ObjectOutputStream(new ByteArrayOutputStream()) + unsafeHashed.asInstanceOf[UnsafeHashedRelation].writeExternal(threadOut) + threadOut.flush() + } + } + + thread1.start() + thread2.start() + thread1.join() + thread2.join() + + val unsafeHashed2 = ser.deserialize[UnsafeHashedRelation](ser.serialize(unsafeHashed)) + val os2 = new ByteArrayOutputStream() + val out2 = new ObjectOutputStream(os2) + unsafeHashed2.writeExternal(out2) + out2.flush() + assert(java.util.Arrays.equals(os.toByteArray, os2.toByteArray)) + } + // This test require 4G heap to run, should run it manually ignore("build HashedRelation that is larger than 1G") { val unsafeProj = UnsafeProjection.create( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org