This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e5e2b914de6 [MINOR][SS][DOCS] Fix typos in the Scaladoc and make the 
semantic of getCurrentWatermarkMs explicit
e5e2b914de6 is described below

commit e5e2b914de6a498ae191bdb0d02308c5b6f13f15
Author: bartosz25 <bartkoniec...@yahoo.fr>
AuthorDate: Sat Jul 15 08:31:31 2023 -0500

    [MINOR][SS][DOCS] Fix typos in the Scaladoc and make the semantic of 
getCurrentWatermarkMs explicit
    
    ### What changes were proposed in this pull request?
    Improve the code comments:
    1. Rate micro-batch data source Scaladoc parameters aren't consistent with 
the options really supported by this data source.
    2. The `getCurrentWatermarkMs` has a special semantic for the 1st 
micro-batch when the watermark is not set yet. IMO, it should return 
`Option[Long]`, hence `None` instead of `0` for the first micro-batch, but 
since it's a breaking change, I preferred to add a note on that instead.
    
    ### Why are the changes needed?
    1. Avoid confusion while using the classes and methods.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    The tests weren't added because the change is only at the Scaladoc level. I 
affirm that the contribution is my original work and that I license the work to 
the project under the project's open source license.
    
    Closes #41988 from bartosz25/comments_fixes.
    
    Authored-by: bartosz25 <bartkoniec...@yahoo.fr>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../sql/execution/streaming/sources/RatePerMicroBatchProvider.scala  | 4 ++--
 .../src/main/scala/org/apache/spark/sql/streaming/GroupState.scala   | 5 +++++
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
index ccf8b0a7b92..41878a6a549 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
@@ -34,11 +34,11 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
  *  with 0L.
  *
  *  This source supports the following options:
- *  - `rowsPerMicroBatch` (e.g. 100): How many rows should be generated per 
micro-batch.
+ *  - `rowsPerBatch` (e.g. 100): How many rows should be generated per 
micro-batch.
  *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the
  *    generated rows.
  *  - `startTimestamp` (e.g. 1000, default: 0): starting value of generated 
time
- *  - `advanceMillisPerMicroBatch` (e.g. 1000, default: 1000): the amount of 
time being advanced in
+ *  - `advanceMillisPerBatch` (e.g. 1000, default: 1000): the amount of time 
being advanced in
  *    generated time on each micro-batch.
  *
  *  Unlike `rate` data source, this data source provides a consistent set of 
input rows per
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
index 2c8f1db74f8..f08a2fd3cc5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
@@ -315,6 +315,11 @@ trait GroupState[S] extends LogicalGroupState[S] {
    *
    * @note In a streaming query, this can be called only when watermark is set 
before calling
    *       `[map/flatMap]GroupsWithState`. In a batch query, this method 
always returns -1.
+   * @note The watermark gets propagated in the end of each query. As a 
result, this method will
+   *       return 0 (1970-01-01T00:00:00) for the first micro-batch. If you 
use this value
+   *       as a part of the timestamp set in the `setTimeoutTimestamp`, it may 
lead to the
+   *       state expiring immediately in the next micro-batch, once the 
watermark gets the
+   *       real value from your data.
    */
   @throws[UnsupportedOperationException](
     "if watermark has not been set before in [map|flatMap]GroupsWithState")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to