HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1023563189


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -42,40 +43,101 @@ object UnsupportedOperationChecker extends Logging {
   }
 
   /**
-   * Checks for possible correctness issue in chained stateful operators. The 
behavior is
-   * controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
-   * Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
-   * print a warning message.
+   * Checks if the expression has a event time column
+   * @param exp the expression to be checked
+   * @return true if it is a event time column.
    */
-  def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
-      case Join(left, right, joinType, _, _)
-        if left.isStreaming && right.isStreaming && joinType != Inner => true
-      case f: FlatMapGroupsWithState
-        if f.isStreaming && f.outputMode == OutputMode.Append() => true
-      case _ => false
+  private def hasEventTimeCol(exp: Expression): Boolean = exp.exists {
+    case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+    case _ => false
+  }
+
+  /**
+   * Checks if the expression contains a range comparison, in which
+   * either side of the comparison is an event-time column. This is used for 
checking
+   * stream-stream time interval join.
+   * @param e the expression to be checked
+   * @return true if there is a time-interval join.
+   */
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = {
+    def hasEventTimeColBinaryComp(neq: Expression): Boolean = {
+      val exp = neq.asInstanceOf[BinaryComparison]
+      hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
     }
 
-    def isStatefulOperation(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate if s.isStreaming => true
-      case _ @ Join(left, right, _, _, _) if left.isStreaming && 
right.isStreaming => true
-      case f: FlatMapGroupsWithState if f.isStreaming => true
-      case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
-      case d: Deduplicate if d.isStreaming => true
+    e.exists {
+      case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | 
_: GreaterThan) =>
+        hasEventTimeColBinaryComp(neq)
       case _ => false
     }
+  }
 
-    val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled
+  /**
+   * This method, combined with isStatefulOperationPossiblyEmitLateRows, 
determines all disallowed
+   * behaviors in multiple stateful operators.
+   * Concretely, All conditions defined below cannot be followed by any 
streaming stateful
+   * operator as defined in isStatefulOperationPossiblyEmitLateRows.
+   * @param p logical plan to be checked
+   * @param outputMode query output mode
+   * @return true if it is not allowed when followed by any streaming stateful
+   * operator as defined in isStatefulOperationPossiblyEmitLateRows.
+   */
+  private def ifCannotBeFollowedByStatefulOperation(
+      p: LogicalPlan, outputMode: OutputMode): Boolean = p match {
+    case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+      left.isStreaming && right.isStreaming &&
+        otherCondition.isDefined && 
hasRangeExprAgainstEventTimeCol(otherCondition.get)
+    // FlatMapGroupsWithState configured with event time
+    case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, 
_, _, _, _)
+      if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+    case p @ FlatMapGroupsInPandasWithState(_, _, _, _, _, timeout, _)
+      if p.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+    case a: Aggregate if a.isStreaming && outputMode != 
InternalOutputModes.Append => true
+    // Since the Distinct node will be replaced to Aggregate in the optimizer 
rule
+    // [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
+    // assuming it as Aggregate.
+    case d @ Distinct(_: LogicalPlan) if d.isStreaming
+      && outputMode != InternalOutputModes.Append => true
+    case _ => false
+  }
 
+  /**
+   * This method is only used with ifCannotBeFollowedByStatefulOperation.
+   * As can tell from the name, it doesn't contain ALL streaming stateful 
operations,
+   * only the stateful operations that are possible to emit late rows.
+   * for example, a Deduplicate without a event time column is still a 
stateful operation
+   * but of less interested because it won't emit late records because of 
watermark.
+   * @param p the logical plan to be checked
+   * @return true if there is a streaming stateful operation
+   */
+  private def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean 
= p match {
+    case s: Aggregate if s.isStreaming => true
+    // Since the Distinct node will be replaced to Aggregate in the optimizer 
rule
+    // [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
+    // assuming it as Aggregate.
+    case d @ Distinct(_: LogicalPlan) if d.isStreaming => true
+    case _ @ Join(left, right, _, _, _) if left.isStreaming && 
right.isStreaming => true
+    case f: FlatMapGroupsWithState if f.isStreaming => true
+    case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
+    // Deduplicate also works without event time column even in streaming,
+    // in such cases, although Dedup is still a stateful operation in a 
streaming
+    // query, it could be ignored in all checks below, so let it return false.
+    case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => 
true
+    case _ => false
+  }
+  /**
+   * Checks for possible correctness issue in chained stateful operators. The 
behavior is
+   * controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
+   * Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
+   * print a warning message.
+   */
+  def checkStreamingQueryGlobalWatermarkLimit(plan: LogicalPlan, outputMode: 
OutputMode): Unit = {
+    val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled
     try {
       plan.foreach { subPlan =>
-        if (isStatefulOperation(subPlan)) {
+        if (isStatefulOperationPossiblyEmitLateRows(subPlan)) {

Review Comment:
   First of all, the method name is not correct. For example, deduplicate never 
produces delayed rows.
   
   Say, if the upstream operator (descendant node for the tree structure) is 
bound to the case of "cannot be followed by stateful operation". Then the 
downstream operator just needs to be "stateful operation" to be disallowed. 
Deduplication without event time column is just an exception. 
   
   Could we rename the method to "isStatefulOperation", and also simplify the 
method doc? Adding exceptional case to the method name doesn't seem to be easy.
   
   (We could be much stricter on semantic and classify several kinds/types from 
"cannot be followed by stateful operation" as there are multiple reasons the 
operator cannot be followed by other stateful operator, but I don't think we 
want to be super exhaustive.)



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -315,15 +298,15 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
     // future.
     expectedMsgs = Seq("Complete"))
 
-  for (outputMode <- Seq(Append, Update, Complete)) {
+  for (outputMode <- Seq(Append, Complete)) {

Review Comment:
   The reason to fail here is not because of following aggregation operator. 
It's just because mapGroupsWithState is not supported with append/complete mode.
   
   * Append/Complete mode: `mapGroupsWithState is not supported with Complete 
output mode on a streaming DataFrame/Dataset`
   * Update mode: `Detected pattern of possible 'correctness' issue due to 
global watermark. The query contains stateful operation ...`
   
   That said, test cases here are effectively the same with above test cases, 
without aggregation.
   
   If you have test case for update mode on mapGroupsWithState followed by 
aggregation, then it's OK to just remove this test case. Otherwise, let's just 
change this to test for Update mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to