Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144992239
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
 ---
    @@ -119,32 +115,39 @@ private[sql] class GroupStateImpl[S] private(
         timeoutTimestamp = timestampMs
       }
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
       override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: 
String): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
       }
     
    -  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
       override def setTimeoutTimestamp(timestamp: Date): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(timestamp.getTime)
       }
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
       override def setTimeoutTimestamp(timestamp: Date, additionalDuration: 
String): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(timestamp.getTime + 
parseDuration(additionalDuration))
       }
     
    +  override def getCurrentWatermarkMs(): Long = {
    +    if (timeoutConf != EventTimeTimeout) {
    +      throw new UnsupportedOperationException(
    +        "Cannot get event time watermark timestamp without enabling event 
time timeout in " +
    +          "[map|flatMap]GroupsWithState")
    +    }
    +    eventTimeWatermarkMs
    +  }
    +
    +  override def getCurrentProcessingTimeMs(): Long = {
    +    if (timeoutConf != ProcessingTimeTimeout) {
    +      throw new UnsupportedOperationException(
    +        "Cannot get processing time timestamp without enabling processing 
time timeout in " +
    +          "map|flatMap]GroupsWithState")
    --- End diff --
    
    nit: `[`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to