[spark] branch master updated (4269c2c -> ce566be)
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4269c2c [SPARK-32851][SQL][TEST] Tests should fail if errors happen when generating projection code add ce566be [SPARK-32180][FOLLOWUP] Fix .rst error in new Pyspark installation guide No new revisions were added by this update. Summary of changes: python/docs/source/getting_started/installation.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4269c2c -> ce566be)
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4269c2c [SPARK-32851][SQL][TEST] Tests should fail if errors happen when generating projection code add ce566be [SPARK-32180][FOLLOWUP] Fix .rst error in new Pyspark installation guide No new revisions were added by this update. Summary of changes: python/docs/source/getting_started/installation.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4269c2c -> ce566be)
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4269c2c [SPARK-32851][SQL][TEST] Tests should fail if errors happen when generating projection code add ce566be [SPARK-32180][FOLLOWUP] Fix .rst error in new Pyspark installation guide No new revisions were added by this update. Summary of changes: python/docs/source/getting_started/installation.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4269c2c -> ce566be)
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4269c2c [SPARK-32851][SQL][TEST] Tests should fail if errors happen when generating projection code add ce566be [SPARK-32180][FOLLOWUP] Fix .rst error in new Pyspark installation guide No new revisions were added by this update. Summary of changes: python/docs/source/getting_started/installation.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4269c2c -> ce566be)
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4269c2c [SPARK-32851][SQL][TEST] Tests should fail if errors happen when generating projection code add ce566be [SPARK-32180][FOLLOWUP] Fix .rst error in new Pyspark installation guide No new revisions were added by this update. Summary of changes: python/docs/source/getting_started/installation.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b4be6a6 -> 4269c2c)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b4be6a6 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite add 4269c2c [SPARK-32851][SQL][TEST] Tests should fail if errors happen when generating projection code No new revisions were added by this update. Summary of changes: .../src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala | 2 ++ sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 ++ 2 files changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b4be6a6 -> 4269c2c)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b4be6a6 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite add 4269c2c [SPARK-32851][SQL][TEST] Tests should fail if errors happen when generating projection code No new revisions were added by this update. Summary of changes: .../src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala | 2 ++ sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 ++ 2 files changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b4be6a6 -> 4269c2c)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b4be6a6 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite add 4269c2c [SPARK-32851][SQL][TEST] Tests should fail if errors happen when generating projection code No new revisions were added by this update. Summary of changes: .../src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala | 2 ++ sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 ++ 2 files changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b4be6a6 -> 4269c2c)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b4be6a6 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite add 4269c2c [SPARK-32851][SQL][TEST] Tests should fail if errors happen when generating projection code No new revisions were added by this update. Summary of changes: .../src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala | 2 ++ sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 ++ 2 files changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b4be6a6 -> 4269c2c)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b4be6a6 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite add 4269c2c [SPARK-32851][SQL][TEST] Tests should fail if errors happen when generating projection code No new revisions were added by this update. Summary of changes: .../src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala | 2 ++ sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 ++ 2 files changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new be76ee9 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite be76ee9 is described below commit be76ee96bdb168dc2b44b25c01c3dd6cf7946f6b Author: Dongjoon Hyun AuthorDate: Fri Sep 11 11:48:34 2020 -0700 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite This PR aims to add `sinkParameter` to check sink options robustly and independently in DataStreamReaderWriterSuite `LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`. To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness. ```scala val query = df.writeStream. ... .start() assert(LastOptions.paramters(..)) query.stop() ``` No. This is a test-only change. Pass the newly updated test case. Closes #29730 from dongjoon-hyun/SPARK-32845. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57) Signed-off-by: Dongjoon Hyun --- .../test/DataStreamReaderWriterSuite.scala | 23 +++--- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 4744aca..38d5c74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -43,11 +43,13 @@ object LastOptions { var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null + var sinkParameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil def clear(): Unit = { parameters = null +sinkParameters = null schema = null partitionColumns = null reset(mockStreamSourceProvider) @@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { -LastOptions.parameters = parameters +LastOptions.sinkParameters = parameters LastOptions.partitionColumns = partitionColumns LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) new Sink { @@ -171,20 +173,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { LastOptions.clear() -val query = df.writeStream +df.writeStream .format("org.apache.spark.sql.streaming.test") .option("opt1", "5") .options(Map("opt2" -> "4")) .options(map) .option("checkpointLocation", newMetadataDir) .start() + .stop() -assert(LastOptions.parameters("opt1") == "5") -assert(LastOptions.parameters("opt2") == "4") -assert(LastOptions.parameters("opt3") == "3") -assert(LastOptions.parameters.contains("checkpointLocation")) - -query.stop() +assert(LastOptions.sinkParameters("opt1") == "5") +assert(LastOptions.sinkParameters("opt2") == "4") +assert(LastOptions.sinkParameters("opt3") == "3") +assert(LastOptions.sinkParameters.contains("checkpointLocation")) } test("SPARK-32832: later option should override earlier options for load()") { @@ -205,7 +206,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .load() assert(LastOptions.parameters.isEmpty) -val query = ds.writeStream +ds.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .option("paTh", "1") @@ -214,8 +215,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("patH", "4") .option("path", "5") .start() -assert(LastOptions.parameters("path") == "5") -query.stop() + .stop() +assert(LastOptions.sinkParameters("path") == "5") } test("partitioning") { --
[spark] branch branch-2.4 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new be76ee9 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite be76ee9 is described below commit be76ee96bdb168dc2b44b25c01c3dd6cf7946f6b Author: Dongjoon Hyun AuthorDate: Fri Sep 11 11:48:34 2020 -0700 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite This PR aims to add `sinkParameter` to check sink options robustly and independently in DataStreamReaderWriterSuite `LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`. To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness. ```scala val query = df.writeStream. ... .start() assert(LastOptions.paramters(..)) query.stop() ``` No. This is a test-only change. Pass the newly updated test case. Closes #29730 from dongjoon-hyun/SPARK-32845. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57) Signed-off-by: Dongjoon Hyun --- .../test/DataStreamReaderWriterSuite.scala | 23 +++--- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 4744aca..38d5c74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -43,11 +43,13 @@ object LastOptions { var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null + var sinkParameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil def clear(): Unit = { parameters = null +sinkParameters = null schema = null partitionColumns = null reset(mockStreamSourceProvider) @@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { -LastOptions.parameters = parameters +LastOptions.sinkParameters = parameters LastOptions.partitionColumns = partitionColumns LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) new Sink { @@ -171,20 +173,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { LastOptions.clear() -val query = df.writeStream +df.writeStream .format("org.apache.spark.sql.streaming.test") .option("opt1", "5") .options(Map("opt2" -> "4")) .options(map) .option("checkpointLocation", newMetadataDir) .start() + .stop() -assert(LastOptions.parameters("opt1") == "5") -assert(LastOptions.parameters("opt2") == "4") -assert(LastOptions.parameters("opt3") == "3") -assert(LastOptions.parameters.contains("checkpointLocation")) - -query.stop() +assert(LastOptions.sinkParameters("opt1") == "5") +assert(LastOptions.sinkParameters("opt2") == "4") +assert(LastOptions.sinkParameters("opt3") == "3") +assert(LastOptions.sinkParameters.contains("checkpointLocation")) } test("SPARK-32832: later option should override earlier options for load()") { @@ -205,7 +206,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .load() assert(LastOptions.parameters.isEmpty) -val query = ds.writeStream +ds.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .option("paTh", "1") @@ -214,8 +215,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("patH", "4") .option("path", "5") .start() -assert(LastOptions.parameters("path") == "5") -query.stop() + .stop() +assert(LastOptions.sinkParameters("path") == "5") } test("partitioning") { --
[spark] branch branch-2.4 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new be76ee9 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite be76ee9 is described below commit be76ee96bdb168dc2b44b25c01c3dd6cf7946f6b Author: Dongjoon Hyun AuthorDate: Fri Sep 11 11:48:34 2020 -0700 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite This PR aims to add `sinkParameter` to check sink options robustly and independently in DataStreamReaderWriterSuite `LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`. To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness. ```scala val query = df.writeStream. ... .start() assert(LastOptions.paramters(..)) query.stop() ``` No. This is a test-only change. Pass the newly updated test case. Closes #29730 from dongjoon-hyun/SPARK-32845. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57) Signed-off-by: Dongjoon Hyun --- .../test/DataStreamReaderWriterSuite.scala | 23 +++--- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 4744aca..38d5c74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -43,11 +43,13 @@ object LastOptions { var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null + var sinkParameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil def clear(): Unit = { parameters = null +sinkParameters = null schema = null partitionColumns = null reset(mockStreamSourceProvider) @@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { -LastOptions.parameters = parameters +LastOptions.sinkParameters = parameters LastOptions.partitionColumns = partitionColumns LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) new Sink { @@ -171,20 +173,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { LastOptions.clear() -val query = df.writeStream +df.writeStream .format("org.apache.spark.sql.streaming.test") .option("opt1", "5") .options(Map("opt2" -> "4")) .options(map) .option("checkpointLocation", newMetadataDir) .start() + .stop() -assert(LastOptions.parameters("opt1") == "5") -assert(LastOptions.parameters("opt2") == "4") -assert(LastOptions.parameters("opt3") == "3") -assert(LastOptions.parameters.contains("checkpointLocation")) - -query.stop() +assert(LastOptions.sinkParameters("opt1") == "5") +assert(LastOptions.sinkParameters("opt2") == "4") +assert(LastOptions.sinkParameters("opt3") == "3") +assert(LastOptions.sinkParameters.contains("checkpointLocation")) } test("SPARK-32832: later option should override earlier options for load()") { @@ -205,7 +206,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .load() assert(LastOptions.parameters.isEmpty) -val query = ds.writeStream +ds.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .option("paTh", "1") @@ -214,8 +215,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("patH", "4") .option("path", "5") .start() -assert(LastOptions.parameters("path") == "5") -query.stop() + .stop() +assert(LastOptions.sinkParameters("path") == "5") } test("partitioning") { --
[spark] branch branch-3.0 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new ec45d10 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite ec45d10 is described below commit ec45d10d26621a0541d937bf6850e153b6cd426b Author: Dongjoon Hyun AuthorDate: Fri Sep 11 11:48:34 2020 -0700 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite This PR aims to add `sinkParameter` to check sink options robustly and independently in DataStreamReaderWriterSuite `LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`. To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness. ```scala val query = df.writeStream. ... .start() assert(LastOptions.paramters(..)) query.stop() ``` No. This is a test-only change. Pass the newly updated test case. Closes #29730 from dongjoon-hyun/SPARK-32845. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57) Signed-off-by: Dongjoon Hyun --- .../test/DataStreamReaderWriterSuite.scala | 23 +++--- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index d90af35..8bf7e27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -43,11 +43,13 @@ object LastOptions { var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null + var sinkParameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil def clear(): Unit = { parameters = null +sinkParameters = null schema = null partitionColumns = null reset(mockStreamSourceProvider) @@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { -LastOptions.parameters = parameters +LastOptions.sinkParameters = parameters LastOptions.partitionColumns = partitionColumns LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) (_: Long, _: DataFrame) => {} @@ -169,20 +171,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { LastOptions.clear() -val query = df.writeStream +df.writeStream .format("org.apache.spark.sql.streaming.test") .option("opt1", "5") .options(Map("opt2" -> "4")) .options(map) .option("checkpointLocation", newMetadataDir) .start() + .stop() -assert(LastOptions.parameters("opt1") == "5") -assert(LastOptions.parameters("opt2") == "4") -assert(LastOptions.parameters("opt3") == "3") -assert(LastOptions.parameters.contains("checkpointLocation")) - -query.stop() +assert(LastOptions.sinkParameters("opt1") == "5") +assert(LastOptions.sinkParameters("opt2") == "4") +assert(LastOptions.sinkParameters("opt3") == "3") +assert(LastOptions.sinkParameters.contains("checkpointLocation")) } test("SPARK-32832: later option should override earlier options for load()") { @@ -203,7 +204,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .load() assert(LastOptions.parameters.isEmpty) -val query = ds.writeStream +ds.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .option("paTh", "1") @@ -212,8 +213,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("patH", "4") .option("path", "5") .start() -assert(LastOptions.parameters("path") == "5") -query.stop() + .stop() +assert(LastOptions.sinkParameters("path") == "5") } test("partitioning") { ---
[spark] branch branch-2.4 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new be76ee9 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite be76ee9 is described below commit be76ee96bdb168dc2b44b25c01c3dd6cf7946f6b Author: Dongjoon Hyun AuthorDate: Fri Sep 11 11:48:34 2020 -0700 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite This PR aims to add `sinkParameter` to check sink options robustly and independently in DataStreamReaderWriterSuite `LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`. To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness. ```scala val query = df.writeStream. ... .start() assert(LastOptions.paramters(..)) query.stop() ``` No. This is a test-only change. Pass the newly updated test case. Closes #29730 from dongjoon-hyun/SPARK-32845. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57) Signed-off-by: Dongjoon Hyun --- .../test/DataStreamReaderWriterSuite.scala | 23 +++--- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 4744aca..38d5c74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -43,11 +43,13 @@ object LastOptions { var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null + var sinkParameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil def clear(): Unit = { parameters = null +sinkParameters = null schema = null partitionColumns = null reset(mockStreamSourceProvider) @@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { -LastOptions.parameters = parameters +LastOptions.sinkParameters = parameters LastOptions.partitionColumns = partitionColumns LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) new Sink { @@ -171,20 +173,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { LastOptions.clear() -val query = df.writeStream +df.writeStream .format("org.apache.spark.sql.streaming.test") .option("opt1", "5") .options(Map("opt2" -> "4")) .options(map) .option("checkpointLocation", newMetadataDir) .start() + .stop() -assert(LastOptions.parameters("opt1") == "5") -assert(LastOptions.parameters("opt2") == "4") -assert(LastOptions.parameters("opt3") == "3") -assert(LastOptions.parameters.contains("checkpointLocation")) - -query.stop() +assert(LastOptions.sinkParameters("opt1") == "5") +assert(LastOptions.sinkParameters("opt2") == "4") +assert(LastOptions.sinkParameters("opt3") == "3") +assert(LastOptions.sinkParameters.contains("checkpointLocation")) } test("SPARK-32832: later option should override earlier options for load()") { @@ -205,7 +206,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .load() assert(LastOptions.parameters.isEmpty) -val query = ds.writeStream +ds.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .option("paTh", "1") @@ -214,8 +215,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("patH", "4") .option("path", "5") .start() -assert(LastOptions.parameters("path") == "5") -query.stop() + .stop() +assert(LastOptions.sinkParameters("path") == "5") } test("partitioning") { --
[spark] branch branch-3.0 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new ec45d10 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite ec45d10 is described below commit ec45d10d26621a0541d937bf6850e153b6cd426b Author: Dongjoon Hyun AuthorDate: Fri Sep 11 11:48:34 2020 -0700 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite This PR aims to add `sinkParameter` to check sink options robustly and independently in DataStreamReaderWriterSuite `LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`. To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness. ```scala val query = df.writeStream. ... .start() assert(LastOptions.paramters(..)) query.stop() ``` No. This is a test-only change. Pass the newly updated test case. Closes #29730 from dongjoon-hyun/SPARK-32845. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57) Signed-off-by: Dongjoon Hyun --- .../test/DataStreamReaderWriterSuite.scala | 23 +++--- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index d90af35..8bf7e27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -43,11 +43,13 @@ object LastOptions { var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null + var sinkParameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil def clear(): Unit = { parameters = null +sinkParameters = null schema = null partitionColumns = null reset(mockStreamSourceProvider) @@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { -LastOptions.parameters = parameters +LastOptions.sinkParameters = parameters LastOptions.partitionColumns = partitionColumns LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) (_: Long, _: DataFrame) => {} @@ -169,20 +171,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { LastOptions.clear() -val query = df.writeStream +df.writeStream .format("org.apache.spark.sql.streaming.test") .option("opt1", "5") .options(Map("opt2" -> "4")) .options(map) .option("checkpointLocation", newMetadataDir) .start() + .stop() -assert(LastOptions.parameters("opt1") == "5") -assert(LastOptions.parameters("opt2") == "4") -assert(LastOptions.parameters("opt3") == "3") -assert(LastOptions.parameters.contains("checkpointLocation")) - -query.stop() +assert(LastOptions.sinkParameters("opt1") == "5") +assert(LastOptions.sinkParameters("opt2") == "4") +assert(LastOptions.sinkParameters("opt3") == "3") +assert(LastOptions.sinkParameters.contains("checkpointLocation")) } test("SPARK-32832: later option should override earlier options for load()") { @@ -203,7 +204,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .load() assert(LastOptions.parameters.isEmpty) -val query = ds.writeStream +ds.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .option("paTh", "1") @@ -212,8 +213,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("patH", "4") .option("path", "5") .start() -assert(LastOptions.parameters("path") == "5") -query.stop() + .stop() +assert(LastOptions.sinkParameters("path") == "5") } test("partitioning") { ---
[spark] branch master updated (f6322d1 -> b4be6a6)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from f6322d1 [SPARK-32180][PYTHON][DOCS] Installation page of Getting Started in PySpark documentation add b4be6a6 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite No new revisions were added by this update. Summary of changes: .../test/DataStreamReaderWriterSuite.scala | 29 +++--- 1 file changed, 15 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new be76ee9 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite be76ee9 is described below commit be76ee96bdb168dc2b44b25c01c3dd6cf7946f6b Author: Dongjoon Hyun AuthorDate: Fri Sep 11 11:48:34 2020 -0700 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite This PR aims to add `sinkParameter` to check sink options robustly and independently in DataStreamReaderWriterSuite `LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`. To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness. ```scala val query = df.writeStream. ... .start() assert(LastOptions.paramters(..)) query.stop() ``` No. This is a test-only change. Pass the newly updated test case. Closes #29730 from dongjoon-hyun/SPARK-32845. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57) Signed-off-by: Dongjoon Hyun --- .../test/DataStreamReaderWriterSuite.scala | 23 +++--- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 4744aca..38d5c74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -43,11 +43,13 @@ object LastOptions { var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null + var sinkParameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil def clear(): Unit = { parameters = null +sinkParameters = null schema = null partitionColumns = null reset(mockStreamSourceProvider) @@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { -LastOptions.parameters = parameters +LastOptions.sinkParameters = parameters LastOptions.partitionColumns = partitionColumns LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) new Sink { @@ -171,20 +173,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { LastOptions.clear() -val query = df.writeStream +df.writeStream .format("org.apache.spark.sql.streaming.test") .option("opt1", "5") .options(Map("opt2" -> "4")) .options(map) .option("checkpointLocation", newMetadataDir) .start() + .stop() -assert(LastOptions.parameters("opt1") == "5") -assert(LastOptions.parameters("opt2") == "4") -assert(LastOptions.parameters("opt3") == "3") -assert(LastOptions.parameters.contains("checkpointLocation")) - -query.stop() +assert(LastOptions.sinkParameters("opt1") == "5") +assert(LastOptions.sinkParameters("opt2") == "4") +assert(LastOptions.sinkParameters("opt3") == "3") +assert(LastOptions.sinkParameters.contains("checkpointLocation")) } test("SPARK-32832: later option should override earlier options for load()") { @@ -205,7 +206,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .load() assert(LastOptions.parameters.isEmpty) -val query = ds.writeStream +ds.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .option("paTh", "1") @@ -214,8 +215,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("patH", "4") .option("path", "5") .start() -assert(LastOptions.parameters("path") == "5") -query.stop() + .stop() +assert(LastOptions.sinkParameters("path") == "5") } test("partitioning") { --
[spark] branch branch-3.0 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new ec45d10 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite ec45d10 is described below commit ec45d10d26621a0541d937bf6850e153b6cd426b Author: Dongjoon Hyun AuthorDate: Fri Sep 11 11:48:34 2020 -0700 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite This PR aims to add `sinkParameter` to check sink options robustly and independently in DataStreamReaderWriterSuite `LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`. To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness. ```scala val query = df.writeStream. ... .start() assert(LastOptions.paramters(..)) query.stop() ``` No. This is a test-only change. Pass the newly updated test case. Closes #29730 from dongjoon-hyun/SPARK-32845. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57) Signed-off-by: Dongjoon Hyun --- .../test/DataStreamReaderWriterSuite.scala | 23 +++--- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index d90af35..8bf7e27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -43,11 +43,13 @@ object LastOptions { var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null + var sinkParameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil def clear(): Unit = { parameters = null +sinkParameters = null schema = null partitionColumns = null reset(mockStreamSourceProvider) @@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { -LastOptions.parameters = parameters +LastOptions.sinkParameters = parameters LastOptions.partitionColumns = partitionColumns LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) (_: Long, _: DataFrame) => {} @@ -169,20 +171,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { LastOptions.clear() -val query = df.writeStream +df.writeStream .format("org.apache.spark.sql.streaming.test") .option("opt1", "5") .options(Map("opt2" -> "4")) .options(map) .option("checkpointLocation", newMetadataDir) .start() + .stop() -assert(LastOptions.parameters("opt1") == "5") -assert(LastOptions.parameters("opt2") == "4") -assert(LastOptions.parameters("opt3") == "3") -assert(LastOptions.parameters.contains("checkpointLocation")) - -query.stop() +assert(LastOptions.sinkParameters("opt1") == "5") +assert(LastOptions.sinkParameters("opt2") == "4") +assert(LastOptions.sinkParameters("opt3") == "3") +assert(LastOptions.sinkParameters.contains("checkpointLocation")) } test("SPARK-32832: later option should override earlier options for load()") { @@ -203,7 +204,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .load() assert(LastOptions.parameters.isEmpty) -val query = ds.writeStream +ds.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .option("paTh", "1") @@ -212,8 +213,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("patH", "4") .option("path", "5") .start() -assert(LastOptions.parameters("path") == "5") -query.stop() + .stop() +assert(LastOptions.sinkParameters("path") == "5") } test("partitioning") { ---
[spark] branch master updated (f6322d1 -> b4be6a6)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from f6322d1 [SPARK-32180][PYTHON][DOCS] Installation page of Getting Started in PySpark documentation add b4be6a6 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite No new revisions were added by this update. Summary of changes: .../test/DataStreamReaderWriterSuite.scala | 29 +++--- 1 file changed, 15 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new ec45d10 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite ec45d10 is described below commit ec45d10d26621a0541d937bf6850e153b6cd426b Author: Dongjoon Hyun AuthorDate: Fri Sep 11 11:48:34 2020 -0700 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite This PR aims to add `sinkParameter` to check sink options robustly and independently in DataStreamReaderWriterSuite `LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`. To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness. ```scala val query = df.writeStream. ... .start() assert(LastOptions.paramters(..)) query.stop() ``` No. This is a test-only change. Pass the newly updated test case. Closes #29730 from dongjoon-hyun/SPARK-32845. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57) Signed-off-by: Dongjoon Hyun --- .../test/DataStreamReaderWriterSuite.scala | 23 +++--- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index d90af35..8bf7e27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -43,11 +43,13 @@ object LastOptions { var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null + var sinkParameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil def clear(): Unit = { parameters = null +sinkParameters = null schema = null partitionColumns = null reset(mockStreamSourceProvider) @@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { -LastOptions.parameters = parameters +LastOptions.sinkParameters = parameters LastOptions.partitionColumns = partitionColumns LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) (_: Long, _: DataFrame) => {} @@ -169,20 +171,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { LastOptions.clear() -val query = df.writeStream +df.writeStream .format("org.apache.spark.sql.streaming.test") .option("opt1", "5") .options(Map("opt2" -> "4")) .options(map) .option("checkpointLocation", newMetadataDir) .start() + .stop() -assert(LastOptions.parameters("opt1") == "5") -assert(LastOptions.parameters("opt2") == "4") -assert(LastOptions.parameters("opt3") == "3") -assert(LastOptions.parameters.contains("checkpointLocation")) - -query.stop() +assert(LastOptions.sinkParameters("opt1") == "5") +assert(LastOptions.sinkParameters("opt2") == "4") +assert(LastOptions.sinkParameters("opt3") == "3") +assert(LastOptions.sinkParameters.contains("checkpointLocation")) } test("SPARK-32832: later option should override earlier options for load()") { @@ -203,7 +204,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .load() assert(LastOptions.parameters.isEmpty) -val query = ds.writeStream +ds.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .option("paTh", "1") @@ -212,8 +213,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("patH", "4") .option("path", "5") .start() -assert(LastOptions.parameters("path") == "5") -query.stop() + .stop() +assert(LastOptions.sinkParameters("path") == "5") } test("partitioning") { ---
[spark] branch master updated (f6322d1 -> b4be6a6)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from f6322d1 [SPARK-32180][PYTHON][DOCS] Installation page of Getting Started in PySpark documentation add b4be6a6 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite No new revisions were added by this update. Summary of changes: .../test/DataStreamReaderWriterSuite.scala | 29 +++--- 1 file changed, 15 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new ec45d10 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite ec45d10 is described below commit ec45d10d26621a0541d937bf6850e153b6cd426b Author: Dongjoon Hyun AuthorDate: Fri Sep 11 11:48:34 2020 -0700 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite This PR aims to add `sinkParameter` to check sink options robustly and independently in DataStreamReaderWriterSuite `LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`. To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness. ```scala val query = df.writeStream. ... .start() assert(LastOptions.paramters(..)) query.stop() ``` No. This is a test-only change. Pass the newly updated test case. Closes #29730 from dongjoon-hyun/SPARK-32845. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57) Signed-off-by: Dongjoon Hyun --- .../test/DataStreamReaderWriterSuite.scala | 23 +++--- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index d90af35..8bf7e27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -43,11 +43,13 @@ object LastOptions { var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null + var sinkParameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil def clear(): Unit = { parameters = null +sinkParameters = null schema = null partitionColumns = null reset(mockStreamSourceProvider) @@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { -LastOptions.parameters = parameters +LastOptions.sinkParameters = parameters LastOptions.partitionColumns = partitionColumns LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) (_: Long, _: DataFrame) => {} @@ -169,20 +171,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { LastOptions.clear() -val query = df.writeStream +df.writeStream .format("org.apache.spark.sql.streaming.test") .option("opt1", "5") .options(Map("opt2" -> "4")) .options(map) .option("checkpointLocation", newMetadataDir) .start() + .stop() -assert(LastOptions.parameters("opt1") == "5") -assert(LastOptions.parameters("opt2") == "4") -assert(LastOptions.parameters("opt3") == "3") -assert(LastOptions.parameters.contains("checkpointLocation")) - -query.stop() +assert(LastOptions.sinkParameters("opt1") == "5") +assert(LastOptions.sinkParameters("opt2") == "4") +assert(LastOptions.sinkParameters("opt3") == "3") +assert(LastOptions.sinkParameters.contains("checkpointLocation")) } test("SPARK-32832: later option should override earlier options for load()") { @@ -203,7 +204,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .load() assert(LastOptions.parameters.isEmpty) -val query = ds.writeStream +ds.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .option("paTh", "1") @@ -212,8 +213,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("patH", "4") .option("path", "5") .start() -assert(LastOptions.parameters("path") == "5") -query.stop() + .stop() +assert(LastOptions.sinkParameters("path") == "5") } test("partitioning") { ---
[spark] branch master updated (f6322d1 -> b4be6a6)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from f6322d1 [SPARK-32180][PYTHON][DOCS] Installation page of Getting Started in PySpark documentation add b4be6a6 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite No new revisions were added by this update. Summary of changes: .../test/DataStreamReaderWriterSuite.scala | 29 +++--- 1 file changed, 15 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (f6322d1 -> b4be6a6)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from f6322d1 [SPARK-32180][PYTHON][DOCS] Installation page of Getting Started in PySpark documentation add b4be6a6 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite No new revisions were added by this update. Summary of changes: .../test/DataStreamReaderWriterSuite.scala | 29 +++--- 1 file changed, 15 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (94cac59 -> f6322d1)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 94cac59 [SPARK-32730][SQL][FOLLOW-UP] Improve LeftAnti SortMergeJoin right side buffering add f6322d1 [SPARK-32180][PYTHON][DOCS] Installation page of Getting Started in PySpark documentation No new revisions were added by this update. Summary of changes: python/docs/source/getting_started/index.rst | 3 + .../docs/source/getting_started/installation.rst | 114 + 2 files changed, 117 insertions(+) create mode 100644 python/docs/source/getting_started/installation.rst - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (94cac59 -> f6322d1)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 94cac59 [SPARK-32730][SQL][FOLLOW-UP] Improve LeftAnti SortMergeJoin right side buffering add f6322d1 [SPARK-32180][PYTHON][DOCS] Installation page of Getting Started in PySpark documentation No new revisions were added by this update. Summary of changes: python/docs/source/getting_started/index.rst | 3 + .../docs/source/getting_started/installation.rst | 114 + 2 files changed, 117 insertions(+) create mode 100644 python/docs/source/getting_started/installation.rst - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (94cac59 -> f6322d1)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 94cac59 [SPARK-32730][SQL][FOLLOW-UP] Improve LeftAnti SortMergeJoin right side buffering add f6322d1 [SPARK-32180][PYTHON][DOCS] Installation page of Getting Started in PySpark documentation No new revisions were added by this update. Summary of changes: python/docs/source/getting_started/index.rst | 3 + .../docs/source/getting_started/installation.rst | 114 + 2 files changed, 117 insertions(+) create mode 100644 python/docs/source/getting_started/installation.rst - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (94cac59 -> f6322d1)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 94cac59 [SPARK-32730][SQL][FOLLOW-UP] Improve LeftAnti SortMergeJoin right side buffering add f6322d1 [SPARK-32180][PYTHON][DOCS] Installation page of Getting Started in PySpark documentation No new revisions were added by this update. Summary of changes: python/docs/source/getting_started/index.rst | 3 + .../docs/source/getting_started/installation.rst | 114 + 2 files changed, 117 insertions(+) create mode 100644 python/docs/source/getting_started/installation.rst - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (94cac59 -> f6322d1)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 94cac59 [SPARK-32730][SQL][FOLLOW-UP] Improve LeftAnti SortMergeJoin right side buffering add f6322d1 [SPARK-32180][PYTHON][DOCS] Installation page of Getting Started in PySpark documentation No new revisions were added by this update. Summary of changes: python/docs/source/getting_started/index.rst | 3 + .../docs/source/getting_started/installation.rst | 114 + 2 files changed, 117 insertions(+) create mode 100644 python/docs/source/getting_started/installation.rst - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] [spark-website] shivaram commented on pull request #289: Release 3.0.1
shivaram commented on pull request #289: URL: https://github.com/apache/spark-website/pull/289#issuecomment-691143254 @zhengruifeng sorry for the delay -- I'll get to this today This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] [spark-website] dongjoon-hyun commented on pull request #289: Release 3.0.1
dongjoon-hyun commented on pull request #289: URL: https://github.com/apache/spark-website/pull/289#issuecomment-691114428 @zhengruifeng . Sorry, I'm not sure SparkR release process either. `SparkR` package info has `Maintainer` field like the following. - https://github.com/apache/spark/blob/master/R/pkg/DESCRIPTION#L6-L11 ``` Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), email = "shiva...@cs.berkeley.edu"), person("Xiangrui", "Meng", role = "aut", email = "m...@databricks.com"), person("Felix", "Cheung", role = "aut", email = "felixche...@apache.org"), person(family = "The Apache Software Foundation", role = c("aut", "cph"))) ``` @shivaram , @mengxr , @felixcheung . Could you give us some advice? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (9f4f49c -> 94cac59)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9f4f49c [SPARK-32853][SQL] Consecutive save/load calls in DataFrame/StreamReader/Writer should not fail add 94cac59 [SPARK-32730][SQL][FOLLOW-UP] Improve LeftAnti SortMergeJoin right side buffering No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala | 3 ++- sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala | 8 2 files changed, 10 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (9f4f49c -> 94cac59)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9f4f49c [SPARK-32853][SQL] Consecutive save/load calls in DataFrame/StreamReader/Writer should not fail add 94cac59 [SPARK-32730][SQL][FOLLOW-UP] Improve LeftAnti SortMergeJoin right side buffering No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala | 3 ++- sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala | 8 2 files changed, 10 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (9f4f49c -> 94cac59)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9f4f49c [SPARK-32853][SQL] Consecutive save/load calls in DataFrame/StreamReader/Writer should not fail add 94cac59 [SPARK-32730][SQL][FOLLOW-UP] Improve LeftAnti SortMergeJoin right side buffering No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala | 3 ++- sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala | 8 2 files changed, 10 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (9f4f49c -> 94cac59)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9f4f49c [SPARK-32853][SQL] Consecutive save/load calls in DataFrame/StreamReader/Writer should not fail add 94cac59 [SPARK-32730][SQL][FOLLOW-UP] Improve LeftAnti SortMergeJoin right side buffering No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala | 3 ++- sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala | 8 2 files changed, 10 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (9f4f49c -> 94cac59)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9f4f49c [SPARK-32853][SQL] Consecutive save/load calls in DataFrame/StreamReader/Writer should not fail add 94cac59 [SPARK-32730][SQL][FOLLOW-UP] Improve LeftAnti SortMergeJoin right side buffering No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala | 3 ++- sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala | 8 2 files changed, 10 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (fe2ab25 -> 9f4f49c)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from fe2ab25 [MINOR][SQL] Fix a typo at 'spark.sql.sources.fileCompressionFactor' error message in SQLConf add 9f4f49c [SPARK-32853][SQL] Consecutive save/load calls in DataFrame/StreamReader/Writer should not fail No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/DataFrameReader.scala | 34 ++ .../org/apache/spark/sql/DataFrameWriter.scala | 41 +++--- .../spark/sql/streaming/DataStreamReader.scala | 19 +++--- .../spark/sql/streaming/DataStreamWriter.scala | 27 +- .../test/DataStreamReaderWriterSuite.scala | 15 .../sql/test/DataFrameReaderWriterSuite.scala | 9 + 6 files changed, 103 insertions(+), 42 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (fe2ab25 -> 9f4f49c)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from fe2ab25 [MINOR][SQL] Fix a typo at 'spark.sql.sources.fileCompressionFactor' error message in SQLConf add 9f4f49c [SPARK-32853][SQL] Consecutive save/load calls in DataFrame/StreamReader/Writer should not fail No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/DataFrameReader.scala | 34 ++ .../org/apache/spark/sql/DataFrameWriter.scala | 41 +++--- .../spark/sql/streaming/DataStreamReader.scala | 19 +++--- .../spark/sql/streaming/DataStreamWriter.scala | 27 +- .../test/DataStreamReaderWriterSuite.scala | 15 .../sql/test/DataFrameReaderWriterSuite.scala | 9 + 6 files changed, 103 insertions(+), 42 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (fe2ab25 -> 9f4f49c)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from fe2ab25 [MINOR][SQL] Fix a typo at 'spark.sql.sources.fileCompressionFactor' error message in SQLConf add 9f4f49c [SPARK-32853][SQL] Consecutive save/load calls in DataFrame/StreamReader/Writer should not fail No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/DataFrameReader.scala | 34 ++ .../org/apache/spark/sql/DataFrameWriter.scala | 41 +++--- .../spark/sql/streaming/DataStreamReader.scala | 19 +++--- .../spark/sql/streaming/DataStreamWriter.scala | 27 +- .../test/DataStreamReaderWriterSuite.scala | 15 .../sql/test/DataFrameReaderWriterSuite.scala | 9 + 6 files changed, 103 insertions(+), 42 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (fe2ab25 -> 9f4f49c)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from fe2ab25 [MINOR][SQL] Fix a typo at 'spark.sql.sources.fileCompressionFactor' error message in SQLConf add 9f4f49c [SPARK-32853][SQL] Consecutive save/load calls in DataFrame/StreamReader/Writer should not fail No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/DataFrameReader.scala | 34 ++ .../org/apache/spark/sql/DataFrameWriter.scala | 41 +++--- .../spark/sql/streaming/DataStreamReader.scala | 19 +++--- .../spark/sql/streaming/DataStreamWriter.scala | 27 +- .../test/DataStreamReaderWriterSuite.scala | 15 .../sql/test/DataFrameReaderWriterSuite.scala | 9 + 6 files changed, 103 insertions(+), 42 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (328d81a -> fe2ab25)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 328d81a [SPARK-32677][SQL][DOCS][MINOR] Improve code comment in CreateFunctionCommand add fe2ab25 [MINOR][SQL] Fix a typo at 'spark.sql.sources.fileCompressionFactor' error message in SQLConf No new revisions were added by this update. Summary of changes: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (fe2ab25 -> 9f4f49c)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from fe2ab25 [MINOR][SQL] Fix a typo at 'spark.sql.sources.fileCompressionFactor' error message in SQLConf add 9f4f49c [SPARK-32853][SQL] Consecutive save/load calls in DataFrame/StreamReader/Writer should not fail No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/DataFrameReader.scala | 34 ++ .../org/apache/spark/sql/DataFrameWriter.scala | 41 +++--- .../spark/sql/streaming/DataStreamReader.scala | 19 +++--- .../spark/sql/streaming/DataStreamWriter.scala | 27 +- .../test/DataStreamReaderWriterSuite.scala | 15 .../sql/test/DataFrameReaderWriterSuite.scala | 9 + 6 files changed, 103 insertions(+), 42 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (328d81a -> fe2ab25)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 328d81a [SPARK-32677][SQL][DOCS][MINOR] Improve code comment in CreateFunctionCommand add fe2ab25 [MINOR][SQL] Fix a typo at 'spark.sql.sources.fileCompressionFactor' error message in SQLConf No new revisions were added by this update. Summary of changes: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (328d81a -> fe2ab25)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 328d81a [SPARK-32677][SQL][DOCS][MINOR] Improve code comment in CreateFunctionCommand add fe2ab25 [MINOR][SQL] Fix a typo at 'spark.sql.sources.fileCompressionFactor' error message in SQLConf No new revisions were added by this update. Summary of changes: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (328d81a -> fe2ab25)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 328d81a [SPARK-32677][SQL][DOCS][MINOR] Improve code comment in CreateFunctionCommand add fe2ab25 [MINOR][SQL] Fix a typo at 'spark.sql.sources.fileCompressionFactor' error message in SQLConf No new revisions were added by this update. Summary of changes: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (328d81a -> fe2ab25)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 328d81a [SPARK-32677][SQL][DOCS][MINOR] Improve code comment in CreateFunctionCommand add fe2ab25 [MINOR][SQL] Fix a typo at 'spark.sql.sources.fileCompressionFactor' error message in SQLConf No new revisions were added by this update. Summary of changes: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-32794][SS] Fixed rare corner case error in micro-batch engine with some stateful queries + no-data-batches + V1 sources
This is an automated email from the ASF dual-hosted git repository. tdas pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new c82b6e4 [SPARK-32794][SS] Fixed rare corner case error in micro-batch engine with some stateful queries + no-data-batches + V1 sources c82b6e4 is described below commit c82b6e4bd31bd4c0447a80caffadebff98baa63e Author: Tathagata Das AuthorDate: Fri Sep 11 03:05:39 2020 -0400 [SPARK-32794][SS] Fixed rare corner case error in micro-batch engine with some stateful queries + no-data-batches + V1 sources ### What changes were proposed in this pull request? Make MicroBatchExecution explicitly call `getBatch` when the start and end offsets are the same. ### Why are the changes needed? Structured Streaming micro-batch engine has the contract with V1 data sources that, after a restart, it will call `source.getBatch()` on the last batch attempted before the restart. However, a very rare combination of sequences violates this contract. It occurs only when - The streaming query has specific types of stateful operations with watermarks (e.g., aggregation in append, mapGroupsWithState with timeouts). - These queries can execute a batch even without new data when the previous updates the watermark and the stateful ops are such that the new watermark can cause new output/cleanup. Such batches are called no-data-batches. - The last batch before termination was an incomplete no-data-batch. Upon restart, the micro-batch engine fails to call `source.getBatch` when attempting to re-execute the incomplete no-data-batch. This occurs because no-data-batches has the same and end offsets, and when a batch is executed, if the start and end offset is same then calling `source.getBatch` is skipped as it is assumed the generated plan will be empty. This only affects V1 data sources which rely on this invariant to detect in the source whether the query is being started from scratch or restarted. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit test with a mock v1 source that fails without the fix. Closes #29700 from tdas/SPARK-32794-2.4. Authored-by: Tathagata Das Signed-off-by: Tathagata Das --- .../execution/streaming/MicroBatchExecution.scala | 11 ++ .../streaming/MicroBatchExecutionSuite.scala | 123 - .../apache/spark/sql/streaming/StreamTest.scala| 8 ++ 3 files changed, 141 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 3bcc26b..0834491 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -291,6 +291,17 @@ class MicroBatchExecution( committedOffsets ++= availableOffsets watermarkTracker.setWatermark( math.max(watermarkTracker.currentWatermark, commitMetadata.nextBatchWatermarkMs)) +} else if (latestCommittedBatchId == latestBatchId - 1) { + availableOffsets.foreach { +case (source: Source, end: Offset) => + val start = committedOffsets.get(source).map(_.asInstanceOf[Offset]) + if (start.map(_ == end).getOrElse(true)) { +source.getBatch(start, end) + } +case nonV1Tuple => + // The V2 API does not have the same edge case requiring getBatch to be called + // here, so we do nothing here. + } } else if (latestCommittedBatchId < latestBatchId - 1) { logWarning(s"Batch completion log latest batch id is " + s"${latestCommittedBatchId}, which is not trailing " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala index c228740..fc84175 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala @@ -19,8 +19,11 @@ package org.apache.spark.sql.execution.streaming import org.scalatest.BeforeAndAfter +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.functions.{count, window} -import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.streaming._ +