HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1013649051


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The 
behavior is
    * controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)

Review Comment:
   We can remove this line as we support outer join as well. We only have issue 
with stream-stream time interval join (with all types) and 
flatMapGroupsWithState.
   (Arguably flatMapGroupsWithState with all output modes should be disallowed, 
but I believe we have a separate check for output mode so OK.)



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The 
behavior is
    * controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)

Review Comment:
   We can remove this line as we support outer join as well. We only have issue 
with stream-stream time interval join (with all types) and 
flatMapGroupsWithState.
   (Arguably flatMapGroupsWithState with all output modes should be disallowed, 
but I believe we will have a separate check for output mode so OK.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to