Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144972248
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
    @@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] {
       /** Get the state value as a scala Option. */
       def getOption: Option[S]
     
    -  /**
    -   * Update the value of the state. Note that `null` is not a valid value, 
and it throws
    -   * IllegalArgumentException.
    -   */
    -  @throws[IllegalArgumentException]("when updating with null")
    +  /** Update the value of the state. */
       def update(newState: S): Unit
     
       /** Remove this state. */
       def remove(): Unit
     
       /**
        * Whether the function has been called because the key has timed out.
    -   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithStates`.
    +   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithState`.
        */
       def hasTimedOut: Boolean
     
    +
       /**
        * Set the timeout duration in ms for this key.
        *
    -   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'durationMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
    +    "if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
       def setTimeoutDuration(durationMs: Long): Unit
     
    +
       /**
        * Set the timeout duration for this key as a string. For example, "1 
hour", "2 days", etc.
        *
    -   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'duration' is not a valid 
duration")
    -  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
    +    "if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
       def setTimeoutDuration(duration: String): Unit
     
    -  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'timestampMs' is not positive or less than the current watermark 
in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time 
and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be 
older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'additionalDuration' is invalid or the final timeout timestamp is 
less than " +
    +      "the current watermark in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): 
Unit
     
    -  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date and an 
additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be 
older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *      `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: 
String): Unit
    +
    +
    +  /**
    +   * Get the current event time watermark as milliseconds in epoch time.
    +   *
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method returns -1 when calling inside a batch query.
    +   */
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
    +  def getCurrentWatermarkMs(): Long
    --- End diff --
    
    Introducing a new API for that purpose is a bigger question. In general, if 
we want to expose a general piece of information that may be used in any 
operation, then we introduce it as a sql function like "current_timestamp". 
That is orthogonal to requirement here, we can still add that if we find the 
need for them


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to