Repository: spark Updated Branches: refs/heads/master 123f0041d -> a6f37b074
[SPARK-25456][SQL][TEST] Fix PythonForeachWriterSuite PythonForeachWriterSuite was failing because RowQueue now needs to have a handle on a SparkEnv with a SerializerManager, so added a mock env with a serializer manager. Also fixed a typo in the `finally` that was hiding the real exception. Tested PythonForeachWriterSuite locally, full tests via jenkins. Closes #22452 from squito/SPARK-25456. Authored-by: Imran Rashid <iras...@cloudera.com> Signed-off-by: Imran Rashid <iras...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6f37b07 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6f37b07 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6f37b07 Branch: refs/heads/master Commit: a6f37b0742d87d5c8ee3e134999d665e5719e822 Parents: 123f004 Author: Imran Rashid <iras...@cloudera.com> Authored: Tue Sep 18 16:33:37 2018 -0500 Committer: Imran Rashid <iras...@cloudera.com> Committed: Tue Sep 18 16:33:37 2018 -0500 ---------------------------------------------------------------------- .../execution/python/PythonForeachWriterSuite.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a6f37b07/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala index 07e6034..d02014c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala @@ -19,17 +19,20 @@ package org.apache.spark.sql.execution.python import scala.collection.mutable.ArrayBuffer +import org.mockito.Mockito.when import org.scalatest.concurrent.Eventually +import org.scalatest.mockito.MockitoSugar import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager} +import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.python.PythonForeachWriter.UnsafeRowBuffer import org.apache.spark.sql.types.{DataType, IntegerType} import org.apache.spark.util.Utils -class PythonForeachWriterSuite extends SparkFunSuite with Eventually { +class PythonForeachWriterSuite extends SparkFunSuite with Eventually with MockitoSugar { testWithBuffer("UnsafeRowBuffer: iterator blocks when no data is available") { b => b.assertIteratorBlocked() @@ -75,7 +78,7 @@ class PythonForeachWriterSuite extends SparkFunSuite with Eventually { tester = new BufferTester(memBytes, sleepPerRowReadMs) f(tester) } finally { - if (tester == null) tester.close() + if (tester != null) tester.close() } } } @@ -83,7 +86,12 @@ class PythonForeachWriterSuite extends SparkFunSuite with Eventually { class BufferTester(memBytes: Long, sleepPerRowReadMs: Int) { private val buffer = { - val mem = new TestMemoryManager(new SparkConf()) + val mockEnv = mock[SparkEnv] + val conf = new SparkConf() + val serializerManager = new SerializerManager(new JavaSerializer(conf), conf, None) + when(mockEnv.serializerManager).thenReturn(serializerManager) + SparkEnv.set(mockEnv) + val mem = new TestMemoryManager(conf) mem.limit(memBytes) val taskM = new TaskMemoryManager(mem, 0) new UnsafeRowBuffer(taskM, Utils.createTempDir(), 1) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org