[spark] branch branch-2.4 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite

2020-09-11 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new be76ee9  [SPARK-32845][SS][TESTS] Add sinkParameter to check sink 
options robustly in DataStreamReaderWriterSuite
be76ee9 is described below

commit be76ee96bdb168dc2b44b25c01c3dd6cf7946f6b
Author: Dongjoon Hyun 
AuthorDate: Fri Sep 11 11:48:34 2020 -0700

[SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly 
in DataStreamReaderWriterSuite

This PR aims to add `sinkParameter`  to check sink options robustly and 
independently in DataStreamReaderWriterSuite

`LastOptions.parameters` is designed to catch three cases: `sourceSchema`, 
`createSource`, `createSink`. However, `StreamQuery.stop` invokes 
`queryExecutionThread.join`, `runStream`, `createSource` immediately and reset 
the stored options by `createSink`.

To catch `createSink` options, currently, the test suite is trying a 
workaround pattern. However, we observed a flakiness in this pattern sometimes. 
If we split `createSink` option separately, we don't need this workaround and 
can eliminate this flakiness.

```scala
val query = df.writeStream.
   ...
   .start()
assert(LastOptions.paramters(..))
query.stop()
```

No. This is a test-only change.

Pass the newly updated test case.

Closes #29730 from dongjoon-hyun/SPARK-32845.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57)
Signed-off-by: Dongjoon Hyun 
---
 .../test/DataStreamReaderWriterSuite.scala | 23 +++---
 1 file changed, 12 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 4744aca..38d5c74 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -43,11 +43,13 @@ object LastOptions {
   var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
   var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
   var parameters: Map[String, String] = null
+  var sinkParameters: Map[String, String] = null
   var schema: Option[StructType] = null
   var partitionColumns: Seq[String] = Nil
 
   def clear(): Unit = {
 parameters = null
+sinkParameters = null
 schema = null
 partitionColumns = null
 reset(mockStreamSourceProvider)
@@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with 
StreamSinkProvider {
   parameters: Map[String, String],
   partitionColumns: Seq[String],
   outputMode: OutputMode): Sink = {
-LastOptions.parameters = parameters
+LastOptions.sinkParameters = parameters
 LastOptions.partitionColumns = partitionColumns
 LastOptions.mockStreamSinkProvider.createSink(spark, parameters, 
partitionColumns, outputMode)
 new Sink {
@@ -171,20 +173,19 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
 LastOptions.clear()
 
-val query = df.writeStream
+df.writeStream
   .format("org.apache.spark.sql.streaming.test")
   .option("opt1", "5")
   .options(Map("opt2" -> "4"))
   .options(map)
   .option("checkpointLocation", newMetadataDir)
   .start()
+  .stop()
 
-assert(LastOptions.parameters("opt1") == "5")
-assert(LastOptions.parameters("opt2") == "4")
-assert(LastOptions.parameters("opt3") == "3")
-assert(LastOptions.parameters.contains("checkpointLocation"))
-
-query.stop()
+assert(LastOptions.sinkParameters("opt1") == "5")
+assert(LastOptions.sinkParameters("opt2") == "4")
+assert(LastOptions.sinkParameters("opt3") == "3")
+assert(LastOptions.sinkParameters.contains("checkpointLocation"))
   }
 
   test("SPARK-32832: later option should override earlier options for load()") 
{
@@ -205,7 +206,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   .load()
 assert(LastOptions.parameters.isEmpty)
 
-val query = ds.writeStream
+ds.writeStream
   .format("org.apache.spark.sql.streaming.test")
   .option("checkpointLocation", newMetadataDir)
   .option("paTh", "1")
@@ -214,8 +215,8 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   .option("patH", "4")
   .option("path", "5")
   .start()
-assert(LastOptions.parameters("path") == "5")
-query.stop()
+  .stop()
+assert(LastOptions.sinkParameters("path") == "5")
   }
 
   test("partitioning") {


--

[spark] branch branch-2.4 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite

2020-09-11 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new be76ee9  [SPARK-32845][SS][TESTS] Add sinkParameter to check sink 
options robustly in DataStreamReaderWriterSuite
be76ee9 is described below

commit be76ee96bdb168dc2b44b25c01c3dd6cf7946f6b
Author: Dongjoon Hyun 
AuthorDate: Fri Sep 11 11:48:34 2020 -0700

[SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly 
in DataStreamReaderWriterSuite

This PR aims to add `sinkParameter`  to check sink options robustly and 
independently in DataStreamReaderWriterSuite

`LastOptions.parameters` is designed to catch three cases: `sourceSchema`, 
`createSource`, `createSink`. However, `StreamQuery.stop` invokes 
`queryExecutionThread.join`, `runStream`, `createSource` immediately and reset 
the stored options by `createSink`.

To catch `createSink` options, currently, the test suite is trying a 
workaround pattern. However, we observed a flakiness in this pattern sometimes. 
If we split `createSink` option separately, we don't need this workaround and 
can eliminate this flakiness.

```scala
val query = df.writeStream.
   ...
   .start()
assert(LastOptions.paramters(..))
query.stop()
```

No. This is a test-only change.

Pass the newly updated test case.

Closes #29730 from dongjoon-hyun/SPARK-32845.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57)
Signed-off-by: Dongjoon Hyun 
---
 .../test/DataStreamReaderWriterSuite.scala | 23 +++---
 1 file changed, 12 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 4744aca..38d5c74 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -43,11 +43,13 @@ object LastOptions {
   var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
   var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
   var parameters: Map[String, String] = null
+  var sinkParameters: Map[String, String] = null
   var schema: Option[StructType] = null
   var partitionColumns: Seq[String] = Nil
 
   def clear(): Unit = {
 parameters = null
+sinkParameters = null
 schema = null
 partitionColumns = null
 reset(mockStreamSourceProvider)
@@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with 
StreamSinkProvider {
   parameters: Map[String, String],
   partitionColumns: Seq[String],
   outputMode: OutputMode): Sink = {
-LastOptions.parameters = parameters
+LastOptions.sinkParameters = parameters
 LastOptions.partitionColumns = partitionColumns
 LastOptions.mockStreamSinkProvider.createSink(spark, parameters, 
partitionColumns, outputMode)
 new Sink {
@@ -171,20 +173,19 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
 LastOptions.clear()
 
-val query = df.writeStream
+df.writeStream
   .format("org.apache.spark.sql.streaming.test")
   .option("opt1", "5")
   .options(Map("opt2" -> "4"))
   .options(map)
   .option("checkpointLocation", newMetadataDir)
   .start()
+  .stop()
 
-assert(LastOptions.parameters("opt1") == "5")
-assert(LastOptions.parameters("opt2") == "4")
-assert(LastOptions.parameters("opt3") == "3")
-assert(LastOptions.parameters.contains("checkpointLocation"))
-
-query.stop()
+assert(LastOptions.sinkParameters("opt1") == "5")
+assert(LastOptions.sinkParameters("opt2") == "4")
+assert(LastOptions.sinkParameters("opt3") == "3")
+assert(LastOptions.sinkParameters.contains("checkpointLocation"))
   }
 
   test("SPARK-32832: later option should override earlier options for load()") 
{
@@ -205,7 +206,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   .load()
 assert(LastOptions.parameters.isEmpty)
 
-val query = ds.writeStream
+ds.writeStream
   .format("org.apache.spark.sql.streaming.test")
   .option("checkpointLocation", newMetadataDir)
   .option("paTh", "1")
@@ -214,8 +215,8 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   .option("patH", "4")
   .option("path", "5")
   .start()
-assert(LastOptions.parameters("path") == "5")
-query.stop()
+  .stop()
+assert(LastOptions.sinkParameters("path") == "5")
   }
 
   test("partitioning") {


--

[spark] branch branch-2.4 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite

2020-09-11 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new be76ee9  [SPARK-32845][SS][TESTS] Add sinkParameter to check sink 
options robustly in DataStreamReaderWriterSuite
be76ee9 is described below

commit be76ee96bdb168dc2b44b25c01c3dd6cf7946f6b
Author: Dongjoon Hyun 
AuthorDate: Fri Sep 11 11:48:34 2020 -0700

[SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly 
in DataStreamReaderWriterSuite

This PR aims to add `sinkParameter`  to check sink options robustly and 
independently in DataStreamReaderWriterSuite

`LastOptions.parameters` is designed to catch three cases: `sourceSchema`, 
`createSource`, `createSink`. However, `StreamQuery.stop` invokes 
`queryExecutionThread.join`, `runStream`, `createSource` immediately and reset 
the stored options by `createSink`.

To catch `createSink` options, currently, the test suite is trying a 
workaround pattern. However, we observed a flakiness in this pattern sometimes. 
If we split `createSink` option separately, we don't need this workaround and 
can eliminate this flakiness.

```scala
val query = df.writeStream.
   ...
   .start()
assert(LastOptions.paramters(..))
query.stop()
```

No. This is a test-only change.

Pass the newly updated test case.

Closes #29730 from dongjoon-hyun/SPARK-32845.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57)
Signed-off-by: Dongjoon Hyun 
---
 .../test/DataStreamReaderWriterSuite.scala | 23 +++---
 1 file changed, 12 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 4744aca..38d5c74 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -43,11 +43,13 @@ object LastOptions {
   var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
   var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
   var parameters: Map[String, String] = null
+  var sinkParameters: Map[String, String] = null
   var schema: Option[StructType] = null
   var partitionColumns: Seq[String] = Nil
 
   def clear(): Unit = {
 parameters = null
+sinkParameters = null
 schema = null
 partitionColumns = null
 reset(mockStreamSourceProvider)
@@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with 
StreamSinkProvider {
   parameters: Map[String, String],
   partitionColumns: Seq[String],
   outputMode: OutputMode): Sink = {
-LastOptions.parameters = parameters
+LastOptions.sinkParameters = parameters
 LastOptions.partitionColumns = partitionColumns
 LastOptions.mockStreamSinkProvider.createSink(spark, parameters, 
partitionColumns, outputMode)
 new Sink {
@@ -171,20 +173,19 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
 LastOptions.clear()
 
-val query = df.writeStream
+df.writeStream
   .format("org.apache.spark.sql.streaming.test")
   .option("opt1", "5")
   .options(Map("opt2" -> "4"))
   .options(map)
   .option("checkpointLocation", newMetadataDir)
   .start()
+  .stop()
 
-assert(LastOptions.parameters("opt1") == "5")
-assert(LastOptions.parameters("opt2") == "4")
-assert(LastOptions.parameters("opt3") == "3")
-assert(LastOptions.parameters.contains("checkpointLocation"))
-
-query.stop()
+assert(LastOptions.sinkParameters("opt1") == "5")
+assert(LastOptions.sinkParameters("opt2") == "4")
+assert(LastOptions.sinkParameters("opt3") == "3")
+assert(LastOptions.sinkParameters.contains("checkpointLocation"))
   }
 
   test("SPARK-32832: later option should override earlier options for load()") 
{
@@ -205,7 +206,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   .load()
 assert(LastOptions.parameters.isEmpty)
 
-val query = ds.writeStream
+ds.writeStream
   .format("org.apache.spark.sql.streaming.test")
   .option("checkpointLocation", newMetadataDir)
   .option("paTh", "1")
@@ -214,8 +215,8 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   .option("patH", "4")
   .option("path", "5")
   .start()
-assert(LastOptions.parameters("path") == "5")
-query.stop()
+  .stop()
+assert(LastOptions.sinkParameters("path") == "5")
   }
 
   test("partitioning") {


--

[spark] branch branch-2.4 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite

2020-09-11 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new be76ee9  [SPARK-32845][SS][TESTS] Add sinkParameter to check sink 
options robustly in DataStreamReaderWriterSuite
be76ee9 is described below

commit be76ee96bdb168dc2b44b25c01c3dd6cf7946f6b
Author: Dongjoon Hyun 
AuthorDate: Fri Sep 11 11:48:34 2020 -0700

[SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly 
in DataStreamReaderWriterSuite

This PR aims to add `sinkParameter`  to check sink options robustly and 
independently in DataStreamReaderWriterSuite

`LastOptions.parameters` is designed to catch three cases: `sourceSchema`, 
`createSource`, `createSink`. However, `StreamQuery.stop` invokes 
`queryExecutionThread.join`, `runStream`, `createSource` immediately and reset 
the stored options by `createSink`.

To catch `createSink` options, currently, the test suite is trying a 
workaround pattern. However, we observed a flakiness in this pattern sometimes. 
If we split `createSink` option separately, we don't need this workaround and 
can eliminate this flakiness.

```scala
val query = df.writeStream.
   ...
   .start()
assert(LastOptions.paramters(..))
query.stop()
```

No. This is a test-only change.

Pass the newly updated test case.

Closes #29730 from dongjoon-hyun/SPARK-32845.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57)
Signed-off-by: Dongjoon Hyun 
---
 .../test/DataStreamReaderWriterSuite.scala | 23 +++---
 1 file changed, 12 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 4744aca..38d5c74 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -43,11 +43,13 @@ object LastOptions {
   var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
   var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
   var parameters: Map[String, String] = null
+  var sinkParameters: Map[String, String] = null
   var schema: Option[StructType] = null
   var partitionColumns: Seq[String] = Nil
 
   def clear(): Unit = {
 parameters = null
+sinkParameters = null
 schema = null
 partitionColumns = null
 reset(mockStreamSourceProvider)
@@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with 
StreamSinkProvider {
   parameters: Map[String, String],
   partitionColumns: Seq[String],
   outputMode: OutputMode): Sink = {
-LastOptions.parameters = parameters
+LastOptions.sinkParameters = parameters
 LastOptions.partitionColumns = partitionColumns
 LastOptions.mockStreamSinkProvider.createSink(spark, parameters, 
partitionColumns, outputMode)
 new Sink {
@@ -171,20 +173,19 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
 LastOptions.clear()
 
-val query = df.writeStream
+df.writeStream
   .format("org.apache.spark.sql.streaming.test")
   .option("opt1", "5")
   .options(Map("opt2" -> "4"))
   .options(map)
   .option("checkpointLocation", newMetadataDir)
   .start()
+  .stop()
 
-assert(LastOptions.parameters("opt1") == "5")
-assert(LastOptions.parameters("opt2") == "4")
-assert(LastOptions.parameters("opt3") == "3")
-assert(LastOptions.parameters.contains("checkpointLocation"))
-
-query.stop()
+assert(LastOptions.sinkParameters("opt1") == "5")
+assert(LastOptions.sinkParameters("opt2") == "4")
+assert(LastOptions.sinkParameters("opt3") == "3")
+assert(LastOptions.sinkParameters.contains("checkpointLocation"))
   }
 
   test("SPARK-32832: later option should override earlier options for load()") 
{
@@ -205,7 +206,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   .load()
 assert(LastOptions.parameters.isEmpty)
 
-val query = ds.writeStream
+ds.writeStream
   .format("org.apache.spark.sql.streaming.test")
   .option("checkpointLocation", newMetadataDir)
   .option("paTh", "1")
@@ -214,8 +215,8 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   .option("patH", "4")
   .option("path", "5")
   .start()
-assert(LastOptions.parameters("path") == "5")
-query.stop()
+  .stop()
+assert(LastOptions.sinkParameters("path") == "5")
   }
 
   test("partitioning") {


--

[spark] branch branch-2.4 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite

2020-09-11 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new be76ee9  [SPARK-32845][SS][TESTS] Add sinkParameter to check sink 
options robustly in DataStreamReaderWriterSuite
be76ee9 is described below

commit be76ee96bdb168dc2b44b25c01c3dd6cf7946f6b
Author: Dongjoon Hyun 
AuthorDate: Fri Sep 11 11:48:34 2020 -0700

[SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly 
in DataStreamReaderWriterSuite

This PR aims to add `sinkParameter`  to check sink options robustly and 
independently in DataStreamReaderWriterSuite

`LastOptions.parameters` is designed to catch three cases: `sourceSchema`, 
`createSource`, `createSink`. However, `StreamQuery.stop` invokes 
`queryExecutionThread.join`, `runStream`, `createSource` immediately and reset 
the stored options by `createSink`.

To catch `createSink` options, currently, the test suite is trying a 
workaround pattern. However, we observed a flakiness in this pattern sometimes. 
If we split `createSink` option separately, we don't need this workaround and 
can eliminate this flakiness.

```scala
val query = df.writeStream.
   ...
   .start()
assert(LastOptions.paramters(..))
query.stop()
```

No. This is a test-only change.

Pass the newly updated test case.

Closes #29730 from dongjoon-hyun/SPARK-32845.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57)
Signed-off-by: Dongjoon Hyun 
---
 .../test/DataStreamReaderWriterSuite.scala | 23 +++---
 1 file changed, 12 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 4744aca..38d5c74 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -43,11 +43,13 @@ object LastOptions {
   var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
   var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
   var parameters: Map[String, String] = null
+  var sinkParameters: Map[String, String] = null
   var schema: Option[StructType] = null
   var partitionColumns: Seq[String] = Nil
 
   def clear(): Unit = {
 parameters = null
+sinkParameters = null
 schema = null
 partitionColumns = null
 reset(mockStreamSourceProvider)
@@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with 
StreamSinkProvider {
   parameters: Map[String, String],
   partitionColumns: Seq[String],
   outputMode: OutputMode): Sink = {
-LastOptions.parameters = parameters
+LastOptions.sinkParameters = parameters
 LastOptions.partitionColumns = partitionColumns
 LastOptions.mockStreamSinkProvider.createSink(spark, parameters, 
partitionColumns, outputMode)
 new Sink {
@@ -171,20 +173,19 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
 LastOptions.clear()
 
-val query = df.writeStream
+df.writeStream
   .format("org.apache.spark.sql.streaming.test")
   .option("opt1", "5")
   .options(Map("opt2" -> "4"))
   .options(map)
   .option("checkpointLocation", newMetadataDir)
   .start()
+  .stop()
 
-assert(LastOptions.parameters("opt1") == "5")
-assert(LastOptions.parameters("opt2") == "4")
-assert(LastOptions.parameters("opt3") == "3")
-assert(LastOptions.parameters.contains("checkpointLocation"))
-
-query.stop()
+assert(LastOptions.sinkParameters("opt1") == "5")
+assert(LastOptions.sinkParameters("opt2") == "4")
+assert(LastOptions.sinkParameters("opt3") == "3")
+assert(LastOptions.sinkParameters.contains("checkpointLocation"))
   }
 
   test("SPARK-32832: later option should override earlier options for load()") 
{
@@ -205,7 +206,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   .load()
 assert(LastOptions.parameters.isEmpty)
 
-val query = ds.writeStream
+ds.writeStream
   .format("org.apache.spark.sql.streaming.test")
   .option("checkpointLocation", newMetadataDir)
   .option("paTh", "1")
@@ -214,8 +215,8 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   .option("patH", "4")
   .option("path", "5")
   .start()
-assert(LastOptions.parameters("path") == "5")
-query.stop()
+  .stop()
+assert(LastOptions.sinkParameters("path") == "5")
   }
 
   test("partitioning") {


--