This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new ec45d10 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite ec45d10 is described below commit ec45d10d26621a0541d937bf6850e153b6cd426b Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Fri Sep 11 11:48:34 2020 -0700 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite This PR aims to add `sinkParameter` to check sink options robustly and independently in DataStreamReaderWriterSuite `LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`. To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness. ```scala val query = df.writeStream. ... .start() assert(LastOptions.paramters(..)) query.stop() ``` No. This is a test-only change. Pass the newly updated test case. Closes #29730 from dongjoon-hyun/SPARK-32845. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../test/DataStreamReaderWriterSuite.scala | 23 +++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index d90af35..8bf7e27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -43,11 +43,13 @@ object LastOptions { var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null + var sinkParameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil def clear(): Unit = { parameters = null + sinkParameters = null schema = null partitionColumns = null reset(mockStreamSourceProvider) @@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { - LastOptions.parameters = parameters + LastOptions.sinkParameters = parameters LastOptions.partitionColumns = partitionColumns LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) (_: Long, _: DataFrame) => {} @@ -169,20 +171,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { LastOptions.clear() - val query = df.writeStream + df.writeStream .format("org.apache.spark.sql.streaming.test") .option("opt1", "5") .options(Map("opt2" -> "4")) .options(map) .option("checkpointLocation", newMetadataDir) .start() + .stop() - assert(LastOptions.parameters("opt1") == "5") - assert(LastOptions.parameters("opt2") == "4") - assert(LastOptions.parameters("opt3") == "3") - assert(LastOptions.parameters.contains("checkpointLocation")) - - query.stop() + assert(LastOptions.sinkParameters("opt1") == "5") + assert(LastOptions.sinkParameters("opt2") == "4") + assert(LastOptions.sinkParameters("opt3") == "3") + assert(LastOptions.sinkParameters.contains("checkpointLocation")) } test("SPARK-32832: later option should override earlier options for load()") { @@ -203,7 +204,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .load() assert(LastOptions.parameters.isEmpty) - val query = ds.writeStream + ds.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .option("paTh", "1") @@ -212,8 +213,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("patH", "4") .option("path", "5") .start() - assert(LastOptions.parameters("path") == "5") - query.stop() + .stop() + assert(LastOptions.sinkParameters("path") == "5") } test("partitioning") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org