[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19495 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r145290193 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala --- @@ -61,6 +61,10 @@ case class FlatMapGroupsWithStateExec( private val isTimeoutEnabled = timeoutConf != NoTimeout val stateManager = new FlatMapGroupsWithState_StateManager(stateEncoder, isTimeoutEnabled) + val watermarkPresent = child.output.exists { --- End diff -- Correction. No it is not. When watermark is not defined in the query, the eventTimeWatermark value is Some(0)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r145285750 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala --- @@ -205,92 +205,122 @@ trait GroupState[S] extends LogicalGroupState[S] { /** Get the state value as a scala Option. */ def getOption: Option[S] - /** - * Update the value of the state. Note that `null` is not a valid value, and it throws - * IllegalArgumentException. - */ - @throws[IllegalArgumentException]("when updating with null") + /** Update the value of the state. */ def update(newState: S): Unit /** Remove this state. */ def remove(): Unit /** * Whether the function has been called because the key has timed out. - * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`. + * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`. */ def hasTimedOut: Boolean + /** * Set the timeout duration in ms for this key. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'durationMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(durationMs: Long): Unit + /** * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'duration' is not a valid duration") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(duration: String): Unit - @throws[IllegalArgumentException]("if 'timestampMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time. * This timestamp cannot be older than the current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ + @throws[IllegalArgumentException]( +"if 'timestampMs' is not positive or less than the current watermark in a streaming query") + @throws[UnsupportedOperationException]( +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutTimestamp(timestampMs: Long): Unit - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time and an additional * duration as a string (e.g. "1 hour", "2 days", etc.). * The final timestamp (including the additional duration) cannot be older than the * current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no side ef
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r145283763 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala --- @@ -119,32 +116,34 @@ private[sql] class GroupStateImpl[S] private( timeoutTimestamp = timestampMs } - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs) } - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestamp: Date): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(timestamp.getTime) } - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration)) } + override def getCurrentWatermarkMs(): Long = { +if (!watermarkPresent) { + throw new UnsupportedOperationException( +"Cannot get event time watermark timestamp without enabling setting watermark before " + --- End diff -- yes. agreed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r145283802 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala --- @@ -61,6 +61,10 @@ case class FlatMapGroupsWithStateExec( private val isTimeoutEnabled = timeoutConf != NoTimeout val stateManager = new FlatMapGroupsWithState_StateManager(stateEncoder, isTimeoutEnabled) + val watermarkPresent = child.output.exists { --- End diff -- yes. agreed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r145283034 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala --- @@ -1086,4 +1181,24 @@ object FlatMapGroupsWithStateSuite { override def metrics: StateStoreMetrics = new StateStoreMetrics(map.size, 0, Map.empty) override def hasCommitted: Boolean = true } + + def assertCanGetProcessingTime(predicate: => Boolean): Unit = { +if (!predicate) throw new TestFailedException("Could not get processing time", 20) --- End diff -- what is the `20`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r145282388 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala --- @@ -61,6 +61,10 @@ case class FlatMapGroupsWithStateExec( private val isTimeoutEnabled = timeoutConf != NoTimeout val stateManager = new FlatMapGroupsWithState_StateManager(stateEncoder, isTimeoutEnabled) + val watermarkPresent = child.output.exists { --- End diff -- this is cleaner, but doesn't `eventTimeWatermark.isDefined` imply that the watermark is present? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r145282515 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala --- @@ -119,32 +116,34 @@ private[sql] class GroupStateImpl[S] private( timeoutTimestamp = timestampMs } - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs) } - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestamp: Date): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(timestamp.getTime) } - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration)) } + override def getCurrentWatermarkMs(): Long = { +if (!watermarkPresent) { + throw new UnsupportedOperationException( +"Cannot get event time watermark timestamp without enabling setting watermark before " + --- End diff -- `without enabling setting watermark` sounds too convoluted. You probably meant `without setting watermark`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r145282877 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala --- @@ -205,92 +205,122 @@ trait GroupState[S] extends LogicalGroupState[S] { /** Get the state value as a scala Option. */ def getOption: Option[S] - /** - * Update the value of the state. Note that `null` is not a valid value, and it throws - * IllegalArgumentException. - */ - @throws[IllegalArgumentException]("when updating with null") + /** Update the value of the state. */ def update(newState: S): Unit /** Remove this state. */ def remove(): Unit /** * Whether the function has been called because the key has timed out. - * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`. + * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`. */ def hasTimedOut: Boolean + /** * Set the timeout duration in ms for this key. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'durationMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(durationMs: Long): Unit + /** * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'duration' is not a valid duration") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(duration: String): Unit - @throws[IllegalArgumentException]("if 'timestampMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time. * This timestamp cannot be older than the current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ + @throws[IllegalArgumentException]( +"if 'timestampMs' is not positive or less than the current watermark in a streaming query") + @throws[UnsupportedOperationException]( +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutTimestamp(timestampMs: Long): Unit - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time and an additional * duration as a string (e.g. "1 hour", "2 days", etc.). * The final timestamp (including the additional duration) cannot be older than the * current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no side
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r145202164 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala --- @@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] { /** Get the state value as a scala Option. */ def getOption: Option[S] - /** - * Update the value of the state. Note that `null` is not a valid value, and it throws - * IllegalArgumentException. - */ - @throws[IllegalArgumentException]("when updating with null") + /** Update the value of the state. */ def update(newState: S): Unit /** Remove this state. */ def remove(): Unit /** * Whether the function has been called because the key has timed out. - * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`. + * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`. */ def hasTimedOut: Boolean + /** * Set the timeout duration in ms for this key. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'durationMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(durationMs: Long): Unit + /** * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'duration' is not a valid duration") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(duration: String): Unit - @throws[IllegalArgumentException]("if 'timestampMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time. * This timestamp cannot be older than the current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ + @throws[IllegalArgumentException]( +"if 'timestampMs' is not positive or less than the current watermark in a streaming query") + @throws[UnsupportedOperationException]( +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutTimestamp(timestampMs: Long): Unit - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time and an additional * duration as a string (e.g. "1 hour", "2 days", etc.). * The final timestamp (including the additional duration) cannot be older than the * current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no side ef
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r145202185 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala --- @@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] { /** Get the state value as a scala Option. */ def getOption: Option[S] - /** - * Update the value of the state. Note that `null` is not a valid value, and it throws - * IllegalArgumentException. - */ - @throws[IllegalArgumentException]("when updating with null") + /** Update the value of the state. */ def update(newState: S): Unit /** Remove this state. */ def remove(): Unit /** * Whether the function has been called because the key has timed out. - * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`. + * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`. */ def hasTimedOut: Boolean + --- End diff -- thats intentional. visually makes it easier to read. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r144991277 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala --- @@ -270,6 +270,60 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf } } + test("GroupState - getCurrentWatermarkMs") { +def assertWrongTimeoutError(test: => Unit): Unit = { + val e = intercept[UnsupportedOperationException] { test } + assert(e.getMessage.contains( +"Cannot get event time watermark timestamp without enabling event time timeout")) +} + +def streamingState(timeoutConf: GroupStateTimeout, watermark: Long): GroupState[Int] = { + GroupStateImpl.createForStreaming(None, 1000, watermark, timeoutConf, hasTimedOut = false) +} + +def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = { + GroupStateImpl.createForBatch(timeoutConf) +} + +// Tests for getCurrentWatermarkMs in streaming queries +assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentWatermarkMs() } +assertWrongTimeoutError { streamingState(ProcessingTimeTimeout, 1000).getCurrentWatermarkMs() } +assert(streamingState(EventTimeTimeout, 1000).getCurrentWatermarkMs() === 1000) +assert(streamingState(EventTimeTimeout, 2000).getCurrentWatermarkMs() === 2000) + +// Tests for getCurrentWatermarkMs in batch queries +assertWrongTimeoutError { batchState(NoTimeout).getCurrentWatermarkMs() } +assertWrongTimeoutError { batchState(ProcessingTimeTimeout).getCurrentWatermarkMs() } +assert(batchState(EventTimeTimeout).getCurrentWatermarkMs() === -1) + } + + test("GroupState - getCurrentProcessingTimeMs") { +def assertWrongTimeoutError(test: => Unit): Unit = { + val e = intercept[UnsupportedOperationException] { test } + assert(e.getMessage.contains( +"Cannot get processing time timestamp without enabling processing time timeout")) +} + +def streamingState(timeoutConf: GroupStateTimeout, procTime: Long): GroupState[Int] = { + GroupStateImpl.createForStreaming(None, procTime, -1, timeoutConf, hasTimedOut = false) +} + +def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = { + GroupStateImpl.createForBatch(timeoutConf) +} + +// Tests for getCurrentWatermarkMs in streaming queries +assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentProcessingTimeMs() } +assertWrongTimeoutError { streamingState(EventTimeTimeout, 1000).getCurrentProcessingTimeMs() } +assert(streamingState(ProcessingTimeTimeout, 1000).getCurrentProcessingTimeMs() === 1000) +assert(streamingState(ProcessingTimeTimeout, 2000).getCurrentProcessingTimeMs() === 2000) + +// Tests for getCurrentWatermarkMs in batch queries +assertWrongTimeoutError { batchState(NoTimeout).getCurrentProcessingTimeMs() } --- End diff -- rgh. my bad. i didnt realize your comment was about the comment. my bad. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r144992379 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala --- @@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] { /** Get the state value as a scala Option. */ def getOption: Option[S] - /** - * Update the value of the state. Note that `null` is not a valid value, and it throws - * IllegalArgumentException. - */ - @throws[IllegalArgumentException]("when updating with null") + /** Update the value of the state. */ def update(newState: S): Unit /** Remove this state. */ def remove(): Unit /** * Whether the function has been called because the key has timed out. - * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`. + * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`. */ def hasTimedOut: Boolean + /** * Set the timeout duration in ms for this key. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'durationMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(durationMs: Long): Unit + /** * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'duration' is not a valid duration") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(duration: String): Unit - @throws[IllegalArgumentException]("if 'timestampMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time. * This timestamp cannot be older than the current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ + @throws[IllegalArgumentException]( +"if 'timestampMs' is not positive or less than the current watermark in a streaming query") + @throws[UnsupportedOperationException]( +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutTimestamp(timestampMs: Long): Unit - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time and an additional * duration as a string (e.g. "1 hour", "2 days", etc.). * The final timestamp (including the additional duration) cannot be older than the * current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no side
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r144992332 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala --- @@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] { /** Get the state value as a scala Option. */ def getOption: Option[S] - /** - * Update the value of the state. Note that `null` is not a valid value, and it throws - * IllegalArgumentException. - */ - @throws[IllegalArgumentException]("when updating with null") + /** Update the value of the state. */ def update(newState: S): Unit /** Remove this state. */ def remove(): Unit /** * Whether the function has been called because the key has timed out. - * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`. + * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`. */ def hasTimedOut: Boolean + --- End diff -- nit: extra line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r144992239 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala --- @@ -119,32 +115,39 @@ private[sql] class GroupStateImpl[S] private( timeoutTimestamp = timestampMs } - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs) } - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestamp: Date): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(timestamp.getTime) } - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration)) } + override def getCurrentWatermarkMs(): Long = { +if (timeoutConf != EventTimeTimeout) { + throw new UnsupportedOperationException( +"Cannot get event time watermark timestamp without enabling event time timeout in " + + "[map|flatMap]GroupsWithState") +} +eventTimeWatermarkMs + } + + override def getCurrentProcessingTimeMs(): Long = { +if (timeoutConf != ProcessingTimeTimeout) { + throw new UnsupportedOperationException( +"Cannot get processing time timestamp without enabling processing time timeout in " + + "map|flatMap]GroupsWithState") --- End diff -- nit: `[` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r144992271 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala --- @@ -187,7 +190,7 @@ private[sql] class GroupStateImpl[S] private( if (timeoutConf != EventTimeTimeout) { throw new UnsupportedOperationException( "Cannot set timeout timestamp without enabling event time timeout in " + - "map/flatMapGroupsWithState") + "map|flatMapGroupsWithState") --- End diff -- `[map|flatMap]` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r144989072 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala --- @@ -270,6 +270,60 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf } } + test("GroupState - getCurrentWatermarkMs") { +def assertWrongTimeoutError(test: => Unit): Unit = { + val e = intercept[UnsupportedOperationException] { test } + assert(e.getMessage.contains( +"Cannot get event time watermark timestamp without enabling event time timeout")) +} + +def streamingState(timeoutConf: GroupStateTimeout, watermark: Long): GroupState[Int] = { + GroupStateImpl.createForStreaming(None, 1000, watermark, timeoutConf, hasTimedOut = false) +} + +def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = { + GroupStateImpl.createForBatch(timeoutConf) +} + +// Tests for getCurrentWatermarkMs in streaming queries +assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentWatermarkMs() } +assertWrongTimeoutError { streamingState(ProcessingTimeTimeout, 1000).getCurrentWatermarkMs() } +assert(streamingState(EventTimeTimeout, 1000).getCurrentWatermarkMs() === 1000) +assert(streamingState(EventTimeTimeout, 2000).getCurrentWatermarkMs() === 2000) + +// Tests for getCurrentWatermarkMs in batch queries +assertWrongTimeoutError { batchState(NoTimeout).getCurrentWatermarkMs() } +assertWrongTimeoutError { batchState(ProcessingTimeTimeout).getCurrentWatermarkMs() } +assert(batchState(EventTimeTimeout).getCurrentWatermarkMs() === -1) + } + + test("GroupState - getCurrentProcessingTimeMs") { +def assertWrongTimeoutError(test: => Unit): Unit = { + val e = intercept[UnsupportedOperationException] { test } + assert(e.getMessage.contains( +"Cannot get processing time timestamp without enabling processing time timeout")) +} + +def streamingState(timeoutConf: GroupStateTimeout, procTime: Long): GroupState[Int] = { + GroupStateImpl.createForStreaming(None, procTime, -1, timeoutConf, hasTimedOut = false) +} + +def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = { + GroupStateImpl.createForBatch(timeoutConf) +} + +// Tests for getCurrentWatermarkMs in streaming queries +assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentProcessingTimeMs() } +assertWrongTimeoutError { streamingState(EventTimeTimeout, 1000).getCurrentProcessingTimeMs() } +assert(streamingState(ProcessingTimeTimeout, 1000).getCurrentProcessingTimeMs() === 1000) +assert(streamingState(ProcessingTimeTimeout, 2000).getCurrentProcessingTimeMs() === 2000) + +// Tests for getCurrentWatermarkMs in batch queries +assertWrongTimeoutError { batchState(NoTimeout).getCurrentProcessingTimeMs() } --- End diff -- the comment above says otherwise. that's why I was confused --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r144972393 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala --- @@ -270,6 +270,60 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf } } + test("GroupState - getCurrentWatermarkMs") { +def assertWrongTimeoutError(test: => Unit): Unit = { + val e = intercept[UnsupportedOperationException] { test } + assert(e.getMessage.contains( +"Cannot get event time watermark timestamp without enabling event time timeout")) +} + +def streamingState(timeoutConf: GroupStateTimeout, watermark: Long): GroupState[Int] = { + GroupStateImpl.createForStreaming(None, 1000, watermark, timeoutConf, hasTimedOut = false) +} + +def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = { + GroupStateImpl.createForBatch(timeoutConf) +} + +// Tests for getCurrentWatermarkMs in streaming queries +assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentWatermarkMs() } +assertWrongTimeoutError { streamingState(ProcessingTimeTimeout, 1000).getCurrentWatermarkMs() } +assert(streamingState(EventTimeTimeout, 1000).getCurrentWatermarkMs() === 1000) +assert(streamingState(EventTimeTimeout, 2000).getCurrentWatermarkMs() === 2000) + +// Tests for getCurrentWatermarkMs in batch queries +assertWrongTimeoutError { batchState(NoTimeout).getCurrentWatermarkMs() } +assertWrongTimeoutError { batchState(ProcessingTimeTimeout).getCurrentWatermarkMs() } +assert(batchState(EventTimeTimeout).getCurrentWatermarkMs() === -1) + } + + test("GroupState - getCurrentProcessingTimeMs") { +def assertWrongTimeoutError(test: => Unit): Unit = { + val e = intercept[UnsupportedOperationException] { test } + assert(e.getMessage.contains( +"Cannot get processing time timestamp without enabling processing time timeout")) +} + +def streamingState(timeoutConf: GroupStateTimeout, procTime: Long): GroupState[Int] = { + GroupStateImpl.createForStreaming(None, procTime, -1, timeoutConf, hasTimedOut = false) +} + +def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = { + GroupStateImpl.createForBatch(timeoutConf) +} + +// Tests for getCurrentWatermarkMs in streaming queries +assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentProcessingTimeMs() } +assertWrongTimeoutError { streamingState(EventTimeTimeout, 1000).getCurrentProcessingTimeMs() } +assert(streamingState(ProcessingTimeTimeout, 1000).getCurrentProcessingTimeMs() === 1000) +assert(streamingState(ProcessingTimeTimeout, 2000).getCurrentProcessingTimeMs() === 2000) + +// Tests for getCurrentWatermarkMs in batch queries +assertWrongTimeoutError { batchState(NoTimeout).getCurrentProcessingTimeMs() } --- End diff -- this is testing `getCurrentProcessingTimeMs`, so yeah, thats by design --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r144972248 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala --- @@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] { /** Get the state value as a scala Option. */ def getOption: Option[S] - /** - * Update the value of the state. Note that `null` is not a valid value, and it throws - * IllegalArgumentException. - */ - @throws[IllegalArgumentException]("when updating with null") + /** Update the value of the state. */ def update(newState: S): Unit /** Remove this state. */ def remove(): Unit /** * Whether the function has been called because the key has timed out. - * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`. + * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`. */ def hasTimedOut: Boolean + /** * Set the timeout duration in ms for this key. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'durationMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(durationMs: Long): Unit + /** * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'duration' is not a valid duration") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(duration: String): Unit - @throws[IllegalArgumentException]("if 'timestampMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time. * This timestamp cannot be older than the current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ + @throws[IllegalArgumentException]( +"if 'timestampMs' is not positive or less than the current watermark in a streaming query") + @throws[UnsupportedOperationException]( +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutTimestamp(timestampMs: Long): Unit - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time and an additional * duration as a string (e.g. "1 hour", "2 days", etc.). * The final timestamp (including the additional duration) cannot be older than the * current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no side ef
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r144963002 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala --- @@ -119,32 +115,39 @@ private[sql] class GroupStateImpl[S] private( timeoutTimestamp = timestampMs } - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs) } - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestamp: Date): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(timestamp.getTime) } - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration)) } + override def getCurrentWatermarkMs(): Long = { +if (timeoutConf != EventTimeTimeout) { + throw new UnsupportedOperationException( +"Cannot get event time watermark timestamp without enabling event time timeout in " + + "[map/flatMap]GroupsWithState") --- End diff -- right. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r144936080 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala --- @@ -119,32 +115,39 @@ private[sql] class GroupStateImpl[S] private( timeoutTimestamp = timestampMs } - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs) } - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestamp: Date): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(timestamp.getTime) } - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration)) } + override def getCurrentWatermarkMs(): Long = { +if (timeoutConf != EventTimeTimeout) { + throw new UnsupportedOperationException( +"Cannot get event time watermark timestamp without enabling event time timeout in " + + "[map/flatMap]GroupsWithState") --- End diff -- uber nit: should we be consistent with `/` and `|` half the places we use `[map/flatMap]` and in the other half there is `[map|flatMap]`. I prefer `|` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r144936653 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala --- @@ -270,6 +270,60 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf } } + test("GroupState - getCurrentWatermarkMs") { +def assertWrongTimeoutError(test: => Unit): Unit = { + val e = intercept[UnsupportedOperationException] { test } + assert(e.getMessage.contains( +"Cannot get event time watermark timestamp without enabling event time timeout")) +} + +def streamingState(timeoutConf: GroupStateTimeout, watermark: Long): GroupState[Int] = { + GroupStateImpl.createForStreaming(None, 1000, watermark, timeoutConf, hasTimedOut = false) +} + +def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = { + GroupStateImpl.createForBatch(timeoutConf) +} + +// Tests for getCurrentWatermarkMs in streaming queries +assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentWatermarkMs() } +assertWrongTimeoutError { streamingState(ProcessingTimeTimeout, 1000).getCurrentWatermarkMs() } +assert(streamingState(EventTimeTimeout, 1000).getCurrentWatermarkMs() === 1000) +assert(streamingState(EventTimeTimeout, 2000).getCurrentWatermarkMs() === 2000) + +// Tests for getCurrentWatermarkMs in batch queries +assertWrongTimeoutError { batchState(NoTimeout).getCurrentWatermarkMs() } +assertWrongTimeoutError { batchState(ProcessingTimeTimeout).getCurrentWatermarkMs() } +assert(batchState(EventTimeTimeout).getCurrentWatermarkMs() === -1) + } + + test("GroupState - getCurrentProcessingTimeMs") { +def assertWrongTimeoutError(test: => Unit): Unit = { + val e = intercept[UnsupportedOperationException] { test } + assert(e.getMessage.contains( +"Cannot get processing time timestamp without enabling processing time timeout")) +} + +def streamingState(timeoutConf: GroupStateTimeout, procTime: Long): GroupState[Int] = { + GroupStateImpl.createForStreaming(None, procTime, -1, timeoutConf, hasTimedOut = false) +} + +def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = { + GroupStateImpl.createForBatch(timeoutConf) +} + +// Tests for getCurrentWatermarkMs in streaming queries +assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentProcessingTimeMs() } +assertWrongTimeoutError { streamingState(EventTimeTimeout, 1000).getCurrentProcessingTimeMs() } +assert(streamingState(ProcessingTimeTimeout, 1000).getCurrentProcessingTimeMs() === 1000) +assert(streamingState(ProcessingTimeTimeout, 2000).getCurrentProcessingTimeMs() === 2000) + +// Tests for getCurrentWatermarkMs in batch queries +assertWrongTimeoutError { batchState(NoTimeout).getCurrentProcessingTimeMs() } --- End diff -- not actually using `getCurrentWatermarkMs` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r144935264 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala --- @@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] { /** Get the state value as a scala Option. */ def getOption: Option[S] - /** - * Update the value of the state. Note that `null` is not a valid value, and it throws - * IllegalArgumentException. - */ - @throws[IllegalArgumentException]("when updating with null") + /** Update the value of the state. */ def update(newState: S): Unit /** Remove this state. */ def remove(): Unit /** * Whether the function has been called because the key has timed out. - * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`. + * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`. */ def hasTimedOut: Boolean + /** * Set the timeout duration in ms for this key. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'durationMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(durationMs: Long): Unit + /** * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc. * - * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Processing time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ @throws[IllegalArgumentException]("if 'duration' is not a valid duration") - @throws[IllegalStateException]("when state is either not initialized, or already removed") @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutDuration(duration: String): Unit - @throws[IllegalArgumentException]("if 'timestampMs' is not positive") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time. * This timestamp cannot be older than the current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no effect when used in a batch query. */ + @throws[IllegalArgumentException]( +"if 'timestampMs' is not positive or less than the current watermark in a streaming query") + @throws[UnsupportedOperationException]( +"if processing time timeout has not been enabled in [map|flatMap]GroupsWithState") def setTimeoutTimestamp(timestampMs: Long): Unit - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") + /** * Set the timeout timestamp for this key as milliseconds in epoch time and an additional * duration as a string (e.g. "1 hour", "2 days", etc.). * The final timestamp (including the additional duration) cannot be older than the * current watermark. * - * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`. + * @note [[GroupStateTimeout Event time timeout]] must be enabled in + * `[map/flatmap]GroupsWithState` for calling this method. + * @note This method has no side
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19495#discussion_r144736325 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala --- @@ -119,32 +115,39 @@ private[sql] class GroupStateImpl[S] private( timeoutTimestamp = timestampMs } - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs) } - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestamp: Date): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(timestamp.getTime) } - @throws[IllegalArgumentException]("if 'additionalDuration' is invalid") - @throws[IllegalStateException]("when state is either not initialized, or already removed") - @throws[UnsupportedOperationException]( -"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query") override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = { checkTimeoutTimestampAllowed() setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration)) } + override def getCurrentWatermarkMs(): Long = { +if (timeoutConf != EventTimeTimeout) { + throw new UnsupportedOperationException( +"Cannot get event time watermark timestamp without enabling event time timeout in " + + "[map/flatMap]GroupsWithState") +} +eventTimeWatermarkMs + } + + override def getCurrentProcessingTimeMs(): Long = { +if (timeoutConf != ProcessingTimeTimeout) { + throw new UnsupportedOperationException( +"Cannot get processing time timestamp without enabling processing time timeout in " + + "map/flatMap]GroupsWithState") --- End diff -- `map/flatMap]` -> `[map/flatMap]` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/19495 [SPARK-22278][SS] Expose current event time watermark and current processing time in GroupState ## What changes were proposed in this pull request? Complex state-updating and/or timeout-handling logic in mapGroupsWithState functions may require taking decisions based on the current event-time watermark and/or processing time. Currently, you can use the SQL function `current_timestamp` to get the current processing time, but it needs to be passed inserted in every row with a select, and then passed through the encoder, which isn't efficient. Furthermore, there is no way to get the current watermark. This PR exposes both of them through the GroupState API. Additionally, it also cleans up some of the GroupState docs. ## How was this patch tested? New unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-22278 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19495.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19495 commit c9a042e2f0228584f6a3f643cfac412c73ed98d7 Author: Tathagata Das Date: 2017-10-10T00:01:02Z Expose event time watermark in the GorupState commit 67114ab59f5a8d79fbe66b7deb93869f656346b9 Author: Tathagata Das Date: 2017-10-14T00:16:08Z Exposed processing time --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org