alex-balikov commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1023155184


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -940,22 +1056,22 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
   def assertPassOnGlobalWatermarkLimit(
       testNamePostfix: String,
       plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure 
= false)
+      outputMode: OutputMode = OutputMode.Append()): Unit = {
+    testGlobalWatermarkLimit(testNamePostfix, plan, expectFailure = false, 
outputMode)
   }
 
   def assertFailOnGlobalWatermarkLimit(
       testNamePostfix: String,
       plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure 
= true)
+      outputMode: OutputMode = OutputMode.Append()): Unit = {

Review Comment:
   ditto



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,41 +42,70 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = 
e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  private def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  private def isStatefulOperationPossiblyEmitLateRows(
+      p: LogicalPlan, outputMode: OutputMode): Boolean = p match {
+    case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+      left.isStreaming && right.isStreaming &&
+        otherCondition.isDefined && 
hasRangeExprAgainstEventTimeCol(otherCondition.get)
+    // FlatMapGroupsWithState configured with event time
+    case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, 
_, _, _, _)
+      if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+    case p @ FlatMapGroupsInPandasWithState(_, _, _, _, _, timeout, _)
+      if p.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+    case a: Aggregate if a.isStreaming && outputMode != 
InternalOutputModes.Append => true
+    // Since the Distinct node will be replaced to Aggregate in the optimizer 
rule
+    // [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
+    // assuming it as Aggregate.
+    case d @ Distinct(_: LogicalPlan) if d.isStreaming
+      && outputMode != InternalOutputModes.Append => true
+    case _ => false
+  }
+
+  private def isStreamingStatefulOperation(p: LogicalPlan): Boolean = p match {
+    case s: Aggregate if s.isStreaming => true
+    // Since the Distinct node will be replaced to Aggregate in the optimizer 
rule
+    // [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
+    // assuming it as Aggregate.
+    case d @ Distinct(_: LogicalPlan) if d.isStreaming => true
+    case _ @ Join(left, right, _, _, _) if left.isStreaming && 
right.isStreaming => true
+    case f: FlatMapGroupsWithState if f.isStreaming => true
+    case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
+    // Deduplicate also works without event time column even in streaming,
+    // in such cases, although Dedup is still a stateful operation in a 
streaming
+    // query, it could be ignored in all checks below, so let it return false.
+    case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => 
true
+    case _ => false
+  }
   /**
    * Checks for possible correctness issue in chained stateful operators. The 
behavior is
    * controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
    * print a warning message.
    */
-  def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
-      case Join(left, right, joinType, _, _)
-        if left.isStreaming && right.isStreaming && joinType != Inner => true
-      case f: FlatMapGroupsWithState
-        if f.isStreaming && f.outputMode == OutputMode.Append() => true
-      case _ => false
-    }
-
-    def isStatefulOperation(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate if s.isStreaming => true
-      case _ @ Join(left, right, _, _, _) if left.isStreaming && 
right.isStreaming => true
-      case f: FlatMapGroupsWithState if f.isStreaming => true
-      case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
-      case d: Deduplicate if d.isStreaming => true
-      case _ => false
-    }
-
+  def checkStreamingQueryGlobalWatermarkLimit(plan: LogicalPlan, outputMode: 
OutputMode): Unit = {

Review Comment:
   a comment 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,41 +42,70 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = 
e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {

Review Comment:
   What does 'Neq' stand for here? - 'not equal'? How does this method know 
that that is the case - seems like it can be applied to any binary expression. 
Maybe add an assert?



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -940,22 +1056,22 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
   def assertPassOnGlobalWatermarkLimit(
       testNamePostfix: String,
       plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure 
= false)
+      outputMode: OutputMode = OutputMode.Append()): Unit = {

Review Comment:
   I would suggest not making Append a default parameter value but specifying 
it explicitly in each test for better readability



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,41 +42,70 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = 
e.exists {

Review Comment:
   a short comment on the methods may be helpful



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -940,22 +1056,22 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
   def assertPassOnGlobalWatermarkLimit(
       testNamePostfix: String,
       plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure 
= false)
+      outputMode: OutputMode = OutputMode.Append()): Unit = {
+    testGlobalWatermarkLimit(testNamePostfix, plan, expectFailure = false, 
outputMode)
   }
 
   def assertFailOnGlobalWatermarkLimit(
       testNamePostfix: String,
       plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure 
= true)
+      outputMode: OutputMode = OutputMode.Append()): Unit = {
+    testGlobalWatermarkLimit(testNamePostfix, plan, expectFailure = true, 
outputMode)
   }
 
   def testGlobalWatermarkLimit(
       testNamePostfix: String,
       plan: LogicalPlan,
-      outputMode: OutputMode,
-      expectFailure: Boolean): Unit = {
+      expectFailure: Boolean,

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to