Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21701#discussion_r201071243
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
 ---
    @@ -20,15 +20,66 @@ package org.apache.spark.sql.execution.streaming
     import scala.collection.mutable
     
     import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.RuntimeConfig
     import org.apache.spark.sql.execution.SparkPlan
    +import org.apache.spark.sql.internal.SQLConf
     
    -class WatermarkTracker extends Logging {
    +/**
    + * Policy to define how to choose a new global watermark value if there are
    + * multiple watermark operators in a streaming query.
    + */
    +sealed trait MultipleWatermarkPolicy {
    +  def chooseGlobalWatermark(operatorWatermarks: Seq[Long]): Long
    +}
    +
    +object MultipleWatermarkPolicy {
    +  val DEFAULT_POLICY_NAME = "min"
    +
    +  def apply(policyName: String): MultipleWatermarkPolicy = {
    +    policyName.toLowerCase match {
    +      case DEFAULT_POLICY_NAME => MinWatermark
    +      case "max" => MaxWatermark
    +      case _ =>
    +        throw new IllegalArgumentException(s"Could not recognize watermark 
policy '$policyName'")
    +    }
    +  }
    +}
    +
    +/**
    + * Policy to choose the *min* of the operator watermark values as the 
global watermark value.
    + * Note that this is the safe (hence default) policy as the global 
watermark will advance
    + * only if all the individual operator watermarks have advanced. In other 
words, in a
    + * streaming query with multiple input streams and watermarks defined on 
all of them,
    + * the global watermark will advance as slowly as the slowest input. So if 
there is watermark
    + * based state cleanup or late-data dropping, then this policy is the most 
conservative one.
    + */
    +case object MinWatermark extends MultipleWatermarkPolicy {
    +  def chooseGlobalWatermark(operatorWatermarks: Seq[Long]): Long = {
    +    if (operatorWatermarks.nonEmpty) operatorWatermarks.min else 0
    --- End diff --
    
    I took a while to figure out how 0 works and then realized 
`operatorWatermarks ` is always not empty. Should we add an assertion rather 
than returning `0`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to