Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21701#discussion_r201071243 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala --- @@ -20,15 +20,66 @@ package org.apache.spark.sql.execution.streaming import scala.collection.mutable import org.apache.spark.internal.Logging +import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.internal.SQLConf -class WatermarkTracker extends Logging { +/** + * Policy to define how to choose a new global watermark value if there are + * multiple watermark operators in a streaming query. + */ +sealed trait MultipleWatermarkPolicy { + def chooseGlobalWatermark(operatorWatermarks: Seq[Long]): Long +} + +object MultipleWatermarkPolicy { + val DEFAULT_POLICY_NAME = "min" + + def apply(policyName: String): MultipleWatermarkPolicy = { + policyName.toLowerCase match { + case DEFAULT_POLICY_NAME => MinWatermark + case "max" => MaxWatermark + case _ => + throw new IllegalArgumentException(s"Could not recognize watermark policy '$policyName'") + } + } +} + +/** + * Policy to choose the *min* of the operator watermark values as the global watermark value. + * Note that this is the safe (hence default) policy as the global watermark will advance + * only if all the individual operator watermarks have advanced. In other words, in a + * streaming query with multiple input streams and watermarks defined on all of them, + * the global watermark will advance as slowly as the slowest input. So if there is watermark + * based state cleanup or late-data dropping, then this policy is the most conservative one. + */ +case object MinWatermark extends MultipleWatermarkPolicy { + def chooseGlobalWatermark(operatorWatermarks: Seq[Long]): Long = { + if (operatorWatermarks.nonEmpty) operatorWatermarks.min else 0 --- End diff -- I took a while to figure out how 0 works and then realized `operatorWatermarks ` is always not empty. Should we add an assertion rather than returning `0`?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org