Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r144972248 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala --- @@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] { /** Get the state value as a scala Option. */ def getOption: Option[S] - /** - * Update the value of the state. Note that `null` is not a valid value, and it throws - * IllegalArgumentException. - */ - @throws[IllegalArgumentException]("when updating with null") + /** Update the value of the state. */ def update(newState: S): Unit /** Remove this state. */ def remove(): Unit /** * Whether the function has been called because the key has timed out. - * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`. + * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`. */ def hasTimedOut: Boolean + /** * Set the timeout duration in ms for this key. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'durationMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( - "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(durationMs: Long): Unit + /** * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'duration' is not a valid duration") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( - "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(duration: String): Unit - @throws[IllegalArgumentException]("if 'timestampMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( - "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time. * This timestamp cannot be older than the current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ + @throws[IllegalArgumentException]( + "if 'timestampMs' is not positive or less than the current watermark in a streaming query") + @throws[UnsupportedOperationException]( + "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutTimestamp(timestampMs: Long): Unit - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( - "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time and an additional * duration as a string (e.g. "1 hour", "2 days", etc.). * The final timestamp (including the additional duration) cannot be older than the * current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no side effect when used in a batch query. */ + @throws[IllegalArgumentException]( + "if 'additionalDuration' is invalid or the final timeout timestamp is less than " + + "the current watermark in a streaming query") + @throws[UnsupportedOperationException]( + "if event time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( - "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as a java.sql.Date. * This timestamp cannot be older than the current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no side effect when used in a batch query. */ + @throws[UnsupportedOperationException]( + "if event time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutTimestamp(timestamp: java.sql.Date): Unit - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( - "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as a java.sql.Date and an additional * duration as a string (e.g. "1 hour", "2 days", etc.). * The final timestamp (including the additional duration) cannot be older than the * current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no side effect when used in a batch query. */ + @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") + @throws[UnsupportedOperationException]( + "if event time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: String): Unit + + + /** + * Get the current event time watermark as milliseconds in epoch time. + * + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method returns -1 when calling inside a batch query. + */ + @throws[UnsupportedOperationException]( + "if event time timeout has not been enabled in [map|flatMap]GroupsWithState") + def getCurrentWatermarkMs(): Long --- End diff -- Introducing a new API for that purpose is a bigger question. In general, if we want to expose a general piece of information that may be used in any operation, then we introduce it as a sql function like "current_timestamp". That is orthogonal to requirement here, we can still add that if we find the need for them
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org