This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new ed9749b [SPARK-33659][SS] Document the current behavior for DataStreamWriter.toTable API ed9749b is described below commit ed9749b728e373d05f84ebd6c6b81b627657cebb Author: Yuanjian Li <yuanjian...@databricks.com> AuthorDate: Thu Dec 24 12:44:37 2020 +0900 [SPARK-33659][SS] Document the current behavior for DataStreamWriter.toTable API ### What changes were proposed in this pull request? Follow up work for #30521, document the following behaviors in the API doc: - Figure out the effects when configurations are (provider/partitionBy) conflicting with the existing table. - Document the lack of functionality on creating a v2 table, and guide that the users should ensure a table is created in prior to avoid the behavior unintended/insufficient table is being created. ### Why are the changes needed? We didn't have full support for the V2 table created in the API now. (TODO SPARK-33638) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Document only. Closes #30885 from xuanyuanking/SPARK-33659. Authored-by: Yuanjian Li <yuanjian...@databricks.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> (cherry picked from commit 86c1cfc5791dae5f2ee8ccd5095dbeb2243baba6) Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- python/pyspark/sql/streaming.py | 13 ++++++++++--- .../org/apache/spark/sql/streaming/DataStreamWriter.scala | 14 ++++++++++++-- .../spark/sql/streaming/test/DataStreamTableAPISuite.scala | 6 +++--- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 5f12229..51941a6 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -1498,8 +1498,7 @@ class DataStreamWriter(object): Starts the execution of the streaming query, which will continually output results to the given table as new data arrives. - A new table will be created if the table not exists. The returned - :class:`StreamingQuery` object can be used to interact with the stream. + The returned :class:`StreamingQuery` object can be used to interact with the stream. .. versionadded:: 3.1.0 @@ -1531,6 +1530,15 @@ class DataStreamWriter(object): ----- This API is evolving. + For v1 table, partitioning columns provided by `partitionBy` will be respected no matter + the table exists or not. A new table will be created if the table not exists. + + For v2 table, `partitionBy` will be ignored if the table already exists. `partitionBy` will + be respected only if the v2 table does not exist. Besides, the v2 table created by this API + lacks some functionalities (e.g., customized properties, options, and serde info). If you + need them, please create the v2 table manually before the execution to avoid creating a + table with incomplete information. + Examples -------- >>> sdf.writeStream.format('parquet').queryName('query').toTable('output_table') @@ -1543,7 +1551,6 @@ class DataStreamWriter(object): ... format='parquet', ... checkpointLocation='/tmp/checkpoint') # doctest: +SKIP """ - # TODO(SPARK-33659): document the current behavior for DataStreamWriter.toTable API self.options(**options) if outputMode is not None: self.outputMode(outputMode) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 9e8dff3..46ab1de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -302,11 +302,21 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { /** * Starts the execution of the streaming query, which will continually output results to the given - * table as new data arrives. A new table will be created if the table not exists. The returned - * [[StreamingQuery]] object can be used to interact with the stream. + * table as new data arrives. The returned [[StreamingQuery]] object can be used to interact with + * the stream. + * + * For v1 table, partitioning columns provided by `partitionBy` will be respected no matter the + * table exists or not. A new table will be created if the table not exists. + * + * For v2 table, `partitionBy` will be ignored if the table already exists. `partitionBy` will be + * respected only if the v2 table does not exist. Besides, the v2 table created by this API lacks + * some functionalities (e.g., customized properties, options, and serde info). If you need them, + * please create the v2 table manually before the execution to avoid creating a table with + * incomplete information. * * @since 3.1.0 */ + @Evolving @throws[TimeoutException] def toTable(tableName: String): StreamingQuery = { this.tableName = tableName diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index 9cf6496..4c5c5e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -275,7 +275,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { val tableName = "stream_test" withTable(tableName) { // The file written by batch will not be seen after the table was written by a streaming - // query. This is because we loads files from the metadata log instead of listing them + // query. This is because we load files from the metadata log instead of listing them // using HDFS API. Seq(4, 5, 6).toDF("value").write.format("parquet") .option("path", dir.getCanonicalPath).saveAsTable(tableName) @@ -289,7 +289,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { val tableName = "stream_test" withTable(tableName) { // The file written by batch will not be seen after the table was written by a streaming - // query. This is because we loads files from the metadata log instead of listing them + // query. This is because we load files from the metadata log instead of listing them // using HDFS API. Seq(4, 5, 6).toDF("value").write.format("parquet").saveAsTable(tableName) @@ -302,7 +302,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { val tableName = "stream_test" withTable(tableName) { // The file written by batch will not be seen after the table was written by a streaming - // query. This is because we loads files from the metadata log instead of listing them + // query. This is because we load files from the metadata log instead of listing them // using HDFS API. Seq(4, 5, 6).toDF("value").write .mode("append").format("parquet").save(dir.getCanonicalPath) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org