HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1013649051


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The 
behavior is
    * controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)

Review Comment:
   We can remove this line as we support outer join as well. We only have issue 
with stream-stream time interval join and flatMapGroupsWithState.
   (Arguably flatMapGroupsWithState with all output modes should be disallowed, 
but I believe we have a separate check for output mode so OK.)



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming

Review Comment:
   maybe missing &&? otherwise this line would be no-op



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -507,15 +507,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
       assertPassOnGlobalWatermarkLimit(
         s"single $joinType join in Append mode",
         streamRelation.join(streamRelation, joinType = RightOuter,
-          condition = Some(attributeWithWatermark === attribute)),

Review Comment:
   The above code comment and related tests are outdated. 
   
   For (time-window) equality join, (inner, left outer, right outer, and full 
outer) should just work with recent fix of late record filtering. For 
time-interval join, all types won't work if there is following stateful 
operator.
   
   That said, we should now have redundant test cases for time-interval join 
specifically to check the different expectation.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The 
behavior is
    * controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)
         if left.isStreaming && right.isStreaming && joinType != Inner => true
       case f: FlatMapGroupsWithState
         if f.isStreaming && f.outputMode == OutputMode.Append() => true
-      case _ => false
+      case _ =>

Review Comment:
   nit: unnecessary change



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -157,10 +193,11 @@ object UnsupportedOperationChecker extends Logging {
     // Disallow multiple streaming aggregations
     val aggregates = collectStreamingAggregates(plan)
 
-    if (aggregates.size > 1) {
+    if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {

Review Comment:
   I think it's not only multiple aggregations. Reasoning streaming aggregation 
followed by time window join with update mode would be very tricky. I'd say we 
should simply disallow multiple stateful operators with update and complete 
mode at all.
   (There could be some combinations where they still make sense for 
update/complete mode, but it doesn't seem to be trivial to reason and list up 
allowlist or blocklist.)
   
   cc. @alex-balikov 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The 
behavior is
    * controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)

Review Comment:
   I think we should check the stream-stream time interval join in here. I 
understand having separate check helps us to produce better error message, but 
we are disallowing the query at all whereas it could run with the correctness 
check config turned off.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {

Review Comment:
   The method name is too general than what it actually does. Maybe 
`hasRangeExprAgainstEventTimeCol` ?
   Or just define the method from the start of `isStreamStreamIntervalJoin` so 
that it is under the proper implicit context.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -40,378 +40,465 @@ class MultiStatefulOperatorsSuite
   }
 
   test("window agg -> window agg, append mode") {
-    // TODO: SPARK-40940 - Fix the unsupported ops checker to allow chaining 
of stateful ops.
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val stream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .groupBy(window($"window", "10 seconds"))
-        .agg(count("*").as("count"), sum("count").as("sum"))
-        .select($"window".getField("start").cast("long").as[Long],
-          $"count".as[Long], $"sum".as[Long])
-
-      testStream(stream)(
-        AddData(inputData, 10 to 21: _*),
-        // op1 W (0, 0)
-        // agg: [10, 15) 5, [15, 20) 5, [20, 25) 2
-        // output: None
-        // state: [10, 15) 5, [15, 20) 5, [20, 25) 2
-        // op2 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 21)
-        // agg: None
-        // output: [10, 15) 5, [15, 20) 5
-        // state: [20, 25) 2
-        // op2 W (0, 21)
-        // agg: [10, 20) (2, 10)
-        // output: [10, 20) (2, 10)
-        // state: None
-        CheckNewAnswer((10, 2, 10)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0)),
-
-        AddData(inputData, 10 to 29: _*),
-        // op1 W (21, 21)
-        // agg: [10, 15) 5 - late, [15, 20) 5 - late, [20, 25) 5, [25, 30) 5
-        // output: None
-        // state: [20, 25) 7, [25, 30) 5
-        // op2 W (21, 21)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (21, 29)
-        // agg: None
-        // output: [20, 25) 7
-        // state: [25, 30) 5
-        // op2 W (21, 29)
-        // agg: [20, 30) (1, 7)
-        // output: None
-        // state: [20, 30) (1, 7)
-        CheckNewAnswer(),
-        assertNumStateRows(Seq(1, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 2)),
-
-        // Move the watermark.
-        AddData(inputData, 30, 31),
-        // op1 W (29, 29)
-        // agg: [30, 35) 2
-        // output: None
-        // state: [25, 30) 5 [30, 35) 2
-        // op2 W (29, 29)
-        // agg: None
-        // output: None
-        // state: [20, 30) (1, 7)
-
-        // no-data batch triggered
-
-        // op1 W (29, 31)
-        // agg: None
-        // output: [25, 30) 5
-        // state: [30, 35) 2
-        // op2 W (29, 31)
-        // agg: [20, 30) (2, 12)
-        // output: [20, 30) (2, 12)
-        // state: None
-        CheckNewAnswer((20, 2, 12)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val stream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .groupBy(window($"window", "10 seconds"))
+      .agg(count("*").as("count"), sum("count").as("sum"))
+      .select($"window".getField("start").cast("long").as[Long],
+        $"count".as[Long], $"sum".as[Long])
+
+    testStream(stream)(
+      AddData(inputData, 10 to 21: _*),
+      // op1 W (0, 0)
+      // agg: [10, 15) 5, [15, 20) 5, [20, 25) 2
+      // output: None
+      // state: [10, 15) 5, [15, 20) 5, [20, 25) 2
+      // op2 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 21)
+      // agg: None
+      // output: [10, 15) 5, [15, 20) 5
+      // state: [20, 25) 2
+      // op2 W (0, 21)
+      // agg: [10, 20) (2, 10)
+      // output: [10, 20) (2, 10)
+      // state: None
+      CheckNewAnswer((10, 2, 10)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0)),
+
+      AddData(inputData, 10 to 29: _*),
+      // op1 W (21, 21)
+      // agg: [10, 15) 5 - late, [15, 20) 5 - late, [20, 25) 5, [25, 30) 5
+      // output: None
+      // state: [20, 25) 7, [25, 30) 5
+      // op2 W (21, 21)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (21, 29)
+      // agg: None
+      // output: [20, 25) 7
+      // state: [25, 30) 5
+      // op2 W (21, 29)
+      // agg: [20, 30) (1, 7)
+      // output: None
+      // state: [20, 30) (1, 7)
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 2)),
+
+      // Move the watermark.
+      AddData(inputData, 30, 31),
+      // op1 W (29, 29)
+      // agg: [30, 35) 2
+      // output: None
+      // state: [25, 30) 5 [30, 35) 2
+      // op2 W (29, 29)
+      // agg: None
+      // output: None
+      // state: [20, 30) (1, 7)
+
+      // no-data batch triggered
+
+      // op1 W (29, 31)
+      // agg: None
+      // output: [25, 30) 5
+      // state: [30, 35) 2
+      // op2 W (29, 31)
+      // agg: [20, 30) (2, 12)
+      // output: [20, 30) (2, 12)
+      // state: None
+      CheckNewAnswer((20, 2, 12)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("agg -> agg -> agg, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val stream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .groupBy(window(window_time($"window"), "10 seconds"))
-        .agg(count("*").as("count"), sum("count").as("sum"))
-        .groupBy(window(window_time($"window"), "20 seconds"))
-        .agg(count("*").as("count"), sum("sum").as("sum"))
-        .select(
-          $"window".getField("start").cast("long").as[Long],
-          $"window".getField("end").cast("long").as[Long],
-          $"count".as[Long], $"sum".as[Long])
-
-      testStream(stream)(
-        AddData(inputData, 0 to 37: _*),
-        // op1 W (0, 0)
-        // agg: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 
30) 5, [30, 35) 5,
-        //   [35, 40) 3
-        // output: None
-        // state: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, 
[25, 30) 5, [30, 35) 5,
-        //   [35, 40) 3
-        // op2 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-        // op3 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 37)
-        // agg: None
-        // output: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, 
[25, 30) 5, [30, 35) 5
-        // state: [35, 40) 3
-        // op2 W (0, 37)
-        // agg: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10), [30, 40) 
(1, 5)
-        // output: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10)
-        // state: [30, 40) (1, 5)
-        // op3 W (0, 37)
-        // agg: [0, 20) (2, 20), [20, 40) (1, 10)
-        // output: [0, 20) (2, 20)
-        // state: [20, 40) (1, 10)
-        CheckNewAnswer((0, 20, 2, 20)),
-        assertNumStateRows(Seq(1, 1, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0, 0)),
-
-        AddData(inputData, 30 to 60: _*),
-        // op1 W (37, 37)
-        // dropped rows: [30, 35), 1 row <= note that 35, 36, 37 are still in 
effect
-        // agg: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, 
[60, 65) 1
-        // output: None
-        // state: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, 
[60, 65) 1
-        // op2 W (37, 37)
-        // output: None
-        // state: [30, 40) (1, 5)
-        // op3 W (37, 37)
-        // output: None
-        // state: [20, 40) (1, 10)
-
-        // no-data batch
-        // op1 W (37, 60)
-        // output: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5
-        // state: [60, 65) 1
-        // op2 W (37, 60)
-        // agg: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
-        // output: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
-        // state: None
-        // op3 W (37, 60)
-        // agg: [20, 40) (2, 23), [40, 60) (2, 20)
-        // output: [20, 40) (2, 23), [40, 60) (2, 20)
-        // state: None
-
-        CheckNewAnswer((20, 40, 2, 23), (40, 60, 2, 20)),
-        assertNumStateRows(Seq(0, 0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val stream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .groupBy(window(window_time($"window"), "10 seconds"))
+      .agg(count("*").as("count"), sum("count").as("sum"))
+      .groupBy(window(window_time($"window"), "20 seconds"))
+      .agg(count("*").as("count"), sum("sum").as("sum"))
+      .select(
+        $"window".getField("start").cast("long").as[Long],
+        $"window".getField("end").cast("long").as[Long],
+        $"count".as[Long], $"sum".as[Long])
+
+    testStream(stream)(
+      AddData(inputData, 0 to 37: _*),
+      // op1 W (0, 0)
+      // agg: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 
30) 5, [30, 35) 5,
+      //   [35, 40) 3
+      // output: None
+      // state: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 
30) 5, [30, 35) 5,
+      //   [35, 40) 3
+      // op2 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+      // op3 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 37)
+      // agg: None
+      // output: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 
30) 5, [30, 35) 5
+      // state: [35, 40) 3
+      // op2 W (0, 37)
+      // agg: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10), [30, 40) 
(1, 5)
+      // output: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10)
+      // state: [30, 40) (1, 5)
+      // op3 W (0, 37)
+      // agg: [0, 20) (2, 20), [20, 40) (1, 10)
+      // output: [0, 20) (2, 20)
+      // state: [20, 40) (1, 10)
+      CheckNewAnswer((0, 20, 2, 20)),
+      assertNumStateRows(Seq(1, 1, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0, 0)),
+
+      AddData(inputData, 30 to 60: _*),
+      // op1 W (37, 37)
+      // dropped rows: [30, 35), 1 row <= note that 35, 36, 37 are still in 
effect
+      // agg: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 
65) 1
+      // output: None
+      // state: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, 
[60, 65) 1
+      // op2 W (37, 37)
+      // output: None
+      // state: [30, 40) (1, 5)
+      // op3 W (37, 37)
+      // output: None
+      // state: [20, 40) (1, 10)
+
+      // no-data batch
+      // op1 W (37, 60)
+      // output: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5
+      // state: [60, 65) 1
+      // op2 W (37, 60)
+      // agg: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
+      // output: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
+      // state: None
+      // op3 W (37, 60)
+      // agg: [20, 40) (2, 23), [40, 60) (2, 20)
+      // output: [20, 40) (2, 23), [40, 60) (2, 20)
+      // state: None
+
+      CheckNewAnswer((20, 40, 2, 23), (40, 60, 2, 20)),
+      assertNumStateRows(Seq(0, 0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
+    )
   }
 
   test("stream deduplication -> aggregation, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val deduplication = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "10 seconds")
-        .dropDuplicates("value", "eventTime")
-
-      val windowedAggregation = deduplication
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"), sum("value").as("sum"))
-        .select($"window".getField("start").cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(windowedAggregation)(
-        AddData(inputData, 1 to 15: _*),
-        // op1 W (0, 0)
-        // input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // deduplicated: None
-        // output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // state: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // op2 W (0, 0)
-        // agg: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
-        // output: None
-        // state: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
-
-        // no-data batch triggered
-
-        // op1 W (0, 5)
-        // agg: None
-        // output: None
-        // state: 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // op2 W (0, 5)
-        // agg: None
-        // output: [0, 5) 4
-        // state: [5, 10) 5 [10, 15) 5, [15, 20) 1
-        CheckNewAnswer((0, 4)),
-        assertNumStateRows(Seq(3, 10)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val deduplication = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "10 seconds")
+      .dropDuplicates("value", "eventTime")
+
+    val windowedAggregation = deduplication
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"), sum("value").as("sum"))
+      .select($"window".getField("start").cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 1 to 15: _*),
+      // op1 W (0, 0)
+      // input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // deduplicated: None
+      // output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // state: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // op2 W (0, 0)
+      // agg: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
+      // output: None
+      // state: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
+
+      // no-data batch triggered
+
+      // op1 W (0, 5)
+      // agg: None
+      // output: None
+      // state: 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // op2 W (0, 5)
+      // agg: None
+      // output: [0, 5) 4
+      // state: [5, 10) 5 [10, 15) 5, [15, 20) 1
+      CheckNewAnswer((0, 4)),
+      assertNumStateRows(Seq(3, 10)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("join -> window agg, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val input1 = MemoryStream[Int]
-      val inputDF1 = input1.toDF
-        .withColumnRenamed("value", "value1")
-        .withColumn("eventTime1", timestamp_seconds($"value1"))
-        .withWatermark("eventTime1", "0 seconds")
-
-      val input2 = MemoryStream[Int]
-      val inputDF2 = input2.toDF
-        .withColumnRenamed("value", "value2")
-        .withColumn("eventTime2", timestamp_seconds($"value2"))
-        .withWatermark("eventTime2", "0 seconds")
-
-      val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), 
"inner")
-        .groupBy(window($"eventTime1", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[Int]
+    val inputDF2 = input2.toDF
+      .withColumnRenamed("value", "value2")
+      .withColumn("eventTime2", timestamp_seconds($"value2"))
+      .withWatermark("eventTime2", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), 
"inner")
+      .groupBy(window($"eventTime1", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+    testStream(stream)(
+      MultiAddData(input1, 1 to 4: _*)(input2, 1 to 4: _*),
+
+      // op1 W (0, 0)
+      // join output: (1, 1), (2, 2), (3, 3), (4, 4)
+      // state: (1, 1), (2, 2), (3, 3), (4, 4)
+      // op2 W (0, 0)
+      // agg: [0, 5) 4
+      // output: None
+      // state: [0, 5) 4
+
+      // no-data batch triggered
+
+      // op1 W (0, 4)
+      // join output: None
+      // state: None
+      // op2 W (0, 4)
+      // agg: None
+      // output: None
+      // state: [0, 5) 4
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0)),
+
+      // Move the watermark
+      MultiAddData(input1, 5)(input2, 5),
+
+      // op1 W (4, 4)
+      // join output: (5, 5)
+      // state: (5, 5)
+      // op2 W (4, 4)
+      // agg: [5, 10) 1
+      // output: None
+      // state: [0, 5) 4, [5, 10) 1
+
+      // no-data batch triggered
+
+      // op1 W (4, 5)
+      // join output: None
+      // state: None
+      // op2 W (4, 5)
+      // agg: None
+      // output: [0, 5) 4
+      // state: [5, 10) 1
+      CheckNewAnswer((0, 4)),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
 
+  test("aggregation -> stream deduplication, append mode") {
+    val inputData = MemoryStream[Int]
+
+    val aggStream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .withColumn("windowEnd", expr("window.end"))
+
+    // dropDuplicates from aggStream without event time column for 
dropDuplicates - the
+    // state does not get trimmed due to watermark advancement.
+    val dedupNoEventTime = aggStream
+      .dropDuplicates("count", "windowEnd")
+      .select(
+        $"windowEnd".cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(dedupNoEventTime)(
+      AddData(inputData, 1, 5, 10, 15),
+
+      // op1 W (0, 0)
+      // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // output: None
+      // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // op2 W (0, 0)
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 15)
+      // agg: None
+      // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
+      // state: [15, 20) 1
+      // op2 W (0, 15)
+      // output: (5, 1), (10, 1), (15, 1)
+      // state: (5, 1), (10, 1), (15, 1)
+
+      CheckNewAnswer((5, 1), (10, 1), (15, 1)),
+      assertNumStateRows(Seq(3, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+
+    // Similar to the above but add event time. The dedup state will get 
trimmed.
+    val dedupWithEventTime = aggStream
+      .withColumn("windowTime", expr("window_time(window)"))
+      .withColumn("windowTimeMicros", expr("unix_micros(windowTime)"))
+      .dropDuplicates("count", "windowEnd", "windowTime")
+      .select(
+        $"windowEnd".cast("long").as[Long],
+        $"windowTimeMicros".cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(dedupWithEventTime)(
+      AddData(inputData, 1, 5, 10, 15),
+
+      // op1 W (0, 0)
+      // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // output: None
+      // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // op2 W (0, 0)
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 15)
+      // agg: None
+      // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
+      // state: [15, 20) 1
+      // op2 W (0, 15)
+      // output: (5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)
+      // state: None - trimmed by watermark
+
+      CheckNewAnswer((5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
+
+  // TODO: This test should be modified or deleted after
+  // we support stream-stream join followed by aggregation
+  test("join on time interval -> window agg, append mode, should fail") {
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[(Int, Int)]
+    val inputDF2 = input2.toDS().toDF("start", "end")
+      .withColumn("eventTime2Start", timestamp_seconds($"start"))
+      .withColumn("eventTime2End", timestamp_seconds($"end"))
+      .withColumn("start2", timestamp_seconds($"start"))
+      .withWatermark("eventTime2Start", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2,
+      expr("eventTime1 >= eventTime2Start AND eventTime1 < eventTime2End " +
+        "AND eventTime1 = start2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+    var excpetionThrown: Boolean = false
+    try {
       testStream(stream)(
-        MultiAddData(input1, 1 to 4: _*)(input2, 1 to 4: _*),
-
-        // op1 W (0, 0)
-        // join output: (1, 1), (2, 2), (3, 3), (4, 4)
-        // state: (1, 1), (2, 2), (3, 3), (4, 4)
-        // op2 W (0, 0)
-        // agg: [0, 5) 4
-        // output: None
-        // state: [0, 5) 4
-
-        // no-data batch triggered
-
-        // op1 W (0, 4)
-        // join output: None
-        // state: None
-        // op2 W (0, 4)
-        // agg: None
-        // output: None
-        // state: [0, 5) 4
-        CheckNewAnswer(),
-        assertNumStateRows(Seq(1, 0)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0)),
-
-        // Move the watermark
-        MultiAddData(input1, 5)(input2, 5),
-
-        // op1 W (4, 4)
-        // join output: (5, 5)
-        // state: (5, 5)
-        // op2 W (4, 4)
-        // agg: [5, 10) 1
-        // output: None
-        // state: [0, 5) 4, [5, 10) 1
-
-        // no-data batch triggered
-
-        // op1 W (4, 5)
-        // join output: None
-        // state: None
-        // op2 W (4, 5)
-        // agg: None
-        // output: [0, 5) 4
-        // state: [5, 10) 1
-        CheckNewAnswer((0, 4)),
-        assertNumStateRows(Seq(1, 0)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
+        StartStream()
       )
+    } catch {
+      case t: AnalysisException =>
+        assert(t.getMessage.contains("stream-stream interval join " +
+          "followed by any stateful operator is not supported yet"))
+        excpetionThrown = true
     }
+    assert(excpetionThrown)
   }
 
-  test("aggregation -> stream deduplication, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val aggStream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .withColumn("windowEnd", expr("window.end"))
-
-      // dropDuplicates from aggStream without event time column for 
dropDuplicates - the
-      // state does not get trimmed due to watermark advancement.
-      val dedupNoEventTime = aggStream
-        .dropDuplicates("count", "windowEnd")
-        .select(
-          $"windowEnd".cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(dedupNoEventTime)(
-        AddData(inputData, 1, 5, 10, 15),
-
-        // op1 W (0, 0)
-        // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // output: None
-        // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // op2 W (0, 0)
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 15)
-        // agg: None
-        // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
-        // state: [15, 20) 1
-        // op2 W (0, 15)
-        // output: (5, 1), (10, 1), (15, 1)
-        // state: (5, 1), (10, 1), (15, 1)
-
-        CheckNewAnswer((5, 1), (10, 1), (15, 1)),
-        assertNumStateRows(Seq(3, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
+  // TODO: This test should be modified or deleted after
+  // we support stream-stream join followed by aggregation
+  test("join with range join on non-time intervals -> window agg, append mode, 
shouldn't fail") {
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withColumn("v1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[(Int, Int)]
+    val inputDF2 = input2.toDS().toDF("start", "end")
+      .withColumn("eventTime2Start", timestamp_seconds($"start"))
+      .withColumn("start2", timestamp_seconds($"start"))
+      .withColumn("end2", timestamp_seconds($"end"))
+      .withWatermark("eventTime2Start", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2,
+      expr("v1 >= start2 AND v1 < end2 " +
+        "AND eventTime1 = start2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+    testStream(stream)(
+      AddData(input1, 1, 2, 3, 4),
+      AddData(input2, (1, 2), (2, 3), (3, 4), (4, 5)),
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
 
-      // Similar to the above but add event time. The dedup state will get 
trimmed.
-      val dedupWithEventTime = aggStream
-        .withColumn("windowTime", expr("window_time(window)"))
-        .withColumn("windowTimeMicros", expr("unix_micros(windowTime)"))
-        .dropDuplicates("count", "windowEnd", "windowTime")
-        .select(
-          $"windowEnd".cast("long").as[Long],
-          $"windowTimeMicros".cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(dedupWithEventTime)(
-        AddData(inputData, 1, 5, 10, 15),
-
-        // op1 W (0, 0)
-        // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // output: None
-        // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // op2 W (0, 0)
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 15)
-        // agg: None
-        // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
-        // state: [15, 20) 1
-        // op2 W (0, 15)
-        // output: (5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)
-        // state: None - trimmed by watermark
-
-        CheckNewAnswer((5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+  // TODO: This test should be modified or deleted after

Review Comment:
   This is just obvious and should have covered in stream-stream join suite. We 
can remove the test case.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -40,378 +40,465 @@ class MultiStatefulOperatorsSuite
   }
 
   test("window agg -> window agg, append mode") {
-    // TODO: SPARK-40940 - Fix the unsupported ops checker to allow chaining 
of stateful ops.
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val stream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .groupBy(window($"window", "10 seconds"))
-        .agg(count("*").as("count"), sum("count").as("sum"))
-        .select($"window".getField("start").cast("long").as[Long],
-          $"count".as[Long], $"sum".as[Long])
-
-      testStream(stream)(
-        AddData(inputData, 10 to 21: _*),
-        // op1 W (0, 0)
-        // agg: [10, 15) 5, [15, 20) 5, [20, 25) 2
-        // output: None
-        // state: [10, 15) 5, [15, 20) 5, [20, 25) 2
-        // op2 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 21)
-        // agg: None
-        // output: [10, 15) 5, [15, 20) 5
-        // state: [20, 25) 2
-        // op2 W (0, 21)
-        // agg: [10, 20) (2, 10)
-        // output: [10, 20) (2, 10)
-        // state: None
-        CheckNewAnswer((10, 2, 10)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0)),
-
-        AddData(inputData, 10 to 29: _*),
-        // op1 W (21, 21)
-        // agg: [10, 15) 5 - late, [15, 20) 5 - late, [20, 25) 5, [25, 30) 5
-        // output: None
-        // state: [20, 25) 7, [25, 30) 5
-        // op2 W (21, 21)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (21, 29)
-        // agg: None
-        // output: [20, 25) 7
-        // state: [25, 30) 5
-        // op2 W (21, 29)
-        // agg: [20, 30) (1, 7)
-        // output: None
-        // state: [20, 30) (1, 7)
-        CheckNewAnswer(),
-        assertNumStateRows(Seq(1, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 2)),
-
-        // Move the watermark.
-        AddData(inputData, 30, 31),
-        // op1 W (29, 29)
-        // agg: [30, 35) 2
-        // output: None
-        // state: [25, 30) 5 [30, 35) 2
-        // op2 W (29, 29)
-        // agg: None
-        // output: None
-        // state: [20, 30) (1, 7)
-
-        // no-data batch triggered
-
-        // op1 W (29, 31)
-        // agg: None
-        // output: [25, 30) 5
-        // state: [30, 35) 2
-        // op2 W (29, 31)
-        // agg: [20, 30) (2, 12)
-        // output: [20, 30) (2, 12)
-        // state: None
-        CheckNewAnswer((20, 2, 12)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val stream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .groupBy(window($"window", "10 seconds"))
+      .agg(count("*").as("count"), sum("count").as("sum"))
+      .select($"window".getField("start").cast("long").as[Long],
+        $"count".as[Long], $"sum".as[Long])
+
+    testStream(stream)(
+      AddData(inputData, 10 to 21: _*),
+      // op1 W (0, 0)
+      // agg: [10, 15) 5, [15, 20) 5, [20, 25) 2
+      // output: None
+      // state: [10, 15) 5, [15, 20) 5, [20, 25) 2
+      // op2 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 21)
+      // agg: None
+      // output: [10, 15) 5, [15, 20) 5
+      // state: [20, 25) 2
+      // op2 W (0, 21)
+      // agg: [10, 20) (2, 10)
+      // output: [10, 20) (2, 10)
+      // state: None
+      CheckNewAnswer((10, 2, 10)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0)),
+
+      AddData(inputData, 10 to 29: _*),
+      // op1 W (21, 21)
+      // agg: [10, 15) 5 - late, [15, 20) 5 - late, [20, 25) 5, [25, 30) 5
+      // output: None
+      // state: [20, 25) 7, [25, 30) 5
+      // op2 W (21, 21)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (21, 29)
+      // agg: None
+      // output: [20, 25) 7
+      // state: [25, 30) 5
+      // op2 W (21, 29)
+      // agg: [20, 30) (1, 7)
+      // output: None
+      // state: [20, 30) (1, 7)
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 2)),
+
+      // Move the watermark.
+      AddData(inputData, 30, 31),
+      // op1 W (29, 29)
+      // agg: [30, 35) 2
+      // output: None
+      // state: [25, 30) 5 [30, 35) 2
+      // op2 W (29, 29)
+      // agg: None
+      // output: None
+      // state: [20, 30) (1, 7)
+
+      // no-data batch triggered
+
+      // op1 W (29, 31)
+      // agg: None
+      // output: [25, 30) 5
+      // state: [30, 35) 2
+      // op2 W (29, 31)
+      // agg: [20, 30) (2, 12)
+      // output: [20, 30) (2, 12)
+      // state: None
+      CheckNewAnswer((20, 2, 12)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("agg -> agg -> agg, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val stream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .groupBy(window(window_time($"window"), "10 seconds"))
-        .agg(count("*").as("count"), sum("count").as("sum"))
-        .groupBy(window(window_time($"window"), "20 seconds"))
-        .agg(count("*").as("count"), sum("sum").as("sum"))
-        .select(
-          $"window".getField("start").cast("long").as[Long],
-          $"window".getField("end").cast("long").as[Long],
-          $"count".as[Long], $"sum".as[Long])
-
-      testStream(stream)(
-        AddData(inputData, 0 to 37: _*),
-        // op1 W (0, 0)
-        // agg: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 
30) 5, [30, 35) 5,
-        //   [35, 40) 3
-        // output: None
-        // state: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, 
[25, 30) 5, [30, 35) 5,
-        //   [35, 40) 3
-        // op2 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-        // op3 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 37)
-        // agg: None
-        // output: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, 
[25, 30) 5, [30, 35) 5
-        // state: [35, 40) 3
-        // op2 W (0, 37)
-        // agg: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10), [30, 40) 
(1, 5)
-        // output: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10)
-        // state: [30, 40) (1, 5)
-        // op3 W (0, 37)
-        // agg: [0, 20) (2, 20), [20, 40) (1, 10)
-        // output: [0, 20) (2, 20)
-        // state: [20, 40) (1, 10)
-        CheckNewAnswer((0, 20, 2, 20)),
-        assertNumStateRows(Seq(1, 1, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0, 0)),
-
-        AddData(inputData, 30 to 60: _*),
-        // op1 W (37, 37)
-        // dropped rows: [30, 35), 1 row <= note that 35, 36, 37 are still in 
effect
-        // agg: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, 
[60, 65) 1
-        // output: None
-        // state: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, 
[60, 65) 1
-        // op2 W (37, 37)
-        // output: None
-        // state: [30, 40) (1, 5)
-        // op3 W (37, 37)
-        // output: None
-        // state: [20, 40) (1, 10)
-
-        // no-data batch
-        // op1 W (37, 60)
-        // output: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5
-        // state: [60, 65) 1
-        // op2 W (37, 60)
-        // agg: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
-        // output: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
-        // state: None
-        // op3 W (37, 60)
-        // agg: [20, 40) (2, 23), [40, 60) (2, 20)
-        // output: [20, 40) (2, 23), [40, 60) (2, 20)
-        // state: None
-
-        CheckNewAnswer((20, 40, 2, 23), (40, 60, 2, 20)),
-        assertNumStateRows(Seq(0, 0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val stream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .groupBy(window(window_time($"window"), "10 seconds"))
+      .agg(count("*").as("count"), sum("count").as("sum"))
+      .groupBy(window(window_time($"window"), "20 seconds"))
+      .agg(count("*").as("count"), sum("sum").as("sum"))
+      .select(
+        $"window".getField("start").cast("long").as[Long],
+        $"window".getField("end").cast("long").as[Long],
+        $"count".as[Long], $"sum".as[Long])
+
+    testStream(stream)(
+      AddData(inputData, 0 to 37: _*),
+      // op1 W (0, 0)
+      // agg: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 
30) 5, [30, 35) 5,
+      //   [35, 40) 3
+      // output: None
+      // state: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 
30) 5, [30, 35) 5,
+      //   [35, 40) 3
+      // op2 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+      // op3 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 37)
+      // agg: None
+      // output: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 
30) 5, [30, 35) 5
+      // state: [35, 40) 3
+      // op2 W (0, 37)
+      // agg: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10), [30, 40) 
(1, 5)
+      // output: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10)
+      // state: [30, 40) (1, 5)
+      // op3 W (0, 37)
+      // agg: [0, 20) (2, 20), [20, 40) (1, 10)
+      // output: [0, 20) (2, 20)
+      // state: [20, 40) (1, 10)
+      CheckNewAnswer((0, 20, 2, 20)),
+      assertNumStateRows(Seq(1, 1, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0, 0)),
+
+      AddData(inputData, 30 to 60: _*),
+      // op1 W (37, 37)
+      // dropped rows: [30, 35), 1 row <= note that 35, 36, 37 are still in 
effect
+      // agg: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 
65) 1
+      // output: None
+      // state: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, 
[60, 65) 1
+      // op2 W (37, 37)
+      // output: None
+      // state: [30, 40) (1, 5)
+      // op3 W (37, 37)
+      // output: None
+      // state: [20, 40) (1, 10)
+
+      // no-data batch
+      // op1 W (37, 60)
+      // output: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5
+      // state: [60, 65) 1
+      // op2 W (37, 60)
+      // agg: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
+      // output: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
+      // state: None
+      // op3 W (37, 60)
+      // agg: [20, 40) (2, 23), [40, 60) (2, 20)
+      // output: [20, 40) (2, 23), [40, 60) (2, 20)
+      // state: None
+
+      CheckNewAnswer((20, 40, 2, 23), (40, 60, 2, 20)),
+      assertNumStateRows(Seq(0, 0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
+    )
   }
 
   test("stream deduplication -> aggregation, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val deduplication = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "10 seconds")
-        .dropDuplicates("value", "eventTime")
-
-      val windowedAggregation = deduplication
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"), sum("value").as("sum"))
-        .select($"window".getField("start").cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(windowedAggregation)(
-        AddData(inputData, 1 to 15: _*),
-        // op1 W (0, 0)
-        // input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // deduplicated: None
-        // output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // state: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // op2 W (0, 0)
-        // agg: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
-        // output: None
-        // state: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
-
-        // no-data batch triggered
-
-        // op1 W (0, 5)
-        // agg: None
-        // output: None
-        // state: 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // op2 W (0, 5)
-        // agg: None
-        // output: [0, 5) 4
-        // state: [5, 10) 5 [10, 15) 5, [15, 20) 1
-        CheckNewAnswer((0, 4)),
-        assertNumStateRows(Seq(3, 10)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val deduplication = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "10 seconds")
+      .dropDuplicates("value", "eventTime")
+
+    val windowedAggregation = deduplication
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"), sum("value").as("sum"))
+      .select($"window".getField("start").cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 1 to 15: _*),
+      // op1 W (0, 0)
+      // input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // deduplicated: None
+      // output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // state: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // op2 W (0, 0)
+      // agg: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
+      // output: None
+      // state: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
+
+      // no-data batch triggered
+
+      // op1 W (0, 5)
+      // agg: None
+      // output: None
+      // state: 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // op2 W (0, 5)
+      // agg: None
+      // output: [0, 5) 4
+      // state: [5, 10) 5 [10, 15) 5, [15, 20) 1
+      CheckNewAnswer((0, 4)),
+      assertNumStateRows(Seq(3, 10)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("join -> window agg, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val input1 = MemoryStream[Int]
-      val inputDF1 = input1.toDF
-        .withColumnRenamed("value", "value1")
-        .withColumn("eventTime1", timestamp_seconds($"value1"))
-        .withWatermark("eventTime1", "0 seconds")
-
-      val input2 = MemoryStream[Int]
-      val inputDF2 = input2.toDF
-        .withColumnRenamed("value", "value2")
-        .withColumn("eventTime2", timestamp_seconds($"value2"))
-        .withWatermark("eventTime2", "0 seconds")
-
-      val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), 
"inner")
-        .groupBy(window($"eventTime1", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[Int]
+    val inputDF2 = input2.toDF
+      .withColumnRenamed("value", "value2")
+      .withColumn("eventTime2", timestamp_seconds($"value2"))
+      .withWatermark("eventTime2", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), 
"inner")
+      .groupBy(window($"eventTime1", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+    testStream(stream)(
+      MultiAddData(input1, 1 to 4: _*)(input2, 1 to 4: _*),
+
+      // op1 W (0, 0)
+      // join output: (1, 1), (2, 2), (3, 3), (4, 4)
+      // state: (1, 1), (2, 2), (3, 3), (4, 4)
+      // op2 W (0, 0)
+      // agg: [0, 5) 4
+      // output: None
+      // state: [0, 5) 4
+
+      // no-data batch triggered
+
+      // op1 W (0, 4)
+      // join output: None
+      // state: None
+      // op2 W (0, 4)
+      // agg: None
+      // output: None
+      // state: [0, 5) 4
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0)),
+
+      // Move the watermark
+      MultiAddData(input1, 5)(input2, 5),
+
+      // op1 W (4, 4)
+      // join output: (5, 5)
+      // state: (5, 5)
+      // op2 W (4, 4)
+      // agg: [5, 10) 1
+      // output: None
+      // state: [0, 5) 4, [5, 10) 1
+
+      // no-data batch triggered
+
+      // op1 W (4, 5)
+      // join output: None
+      // state: None
+      // op2 W (4, 5)
+      // agg: None
+      // output: [0, 5) 4
+      // state: [5, 10) 1
+      CheckNewAnswer((0, 4)),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
 
+  test("aggregation -> stream deduplication, append mode") {
+    val inputData = MemoryStream[Int]
+
+    val aggStream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .withColumn("windowEnd", expr("window.end"))
+
+    // dropDuplicates from aggStream without event time column for 
dropDuplicates - the
+    // state does not get trimmed due to watermark advancement.
+    val dedupNoEventTime = aggStream
+      .dropDuplicates("count", "windowEnd")
+      .select(
+        $"windowEnd".cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(dedupNoEventTime)(
+      AddData(inputData, 1, 5, 10, 15),
+
+      // op1 W (0, 0)
+      // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // output: None
+      // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // op2 W (0, 0)
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 15)
+      // agg: None
+      // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
+      // state: [15, 20) 1
+      // op2 W (0, 15)
+      // output: (5, 1), (10, 1), (15, 1)
+      // state: (5, 1), (10, 1), (15, 1)
+
+      CheckNewAnswer((5, 1), (10, 1), (15, 1)),
+      assertNumStateRows(Seq(3, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+
+    // Similar to the above but add event time. The dedup state will get 
trimmed.
+    val dedupWithEventTime = aggStream
+      .withColumn("windowTime", expr("window_time(window)"))
+      .withColumn("windowTimeMicros", expr("unix_micros(windowTime)"))
+      .dropDuplicates("count", "windowEnd", "windowTime")
+      .select(
+        $"windowEnd".cast("long").as[Long],
+        $"windowTimeMicros".cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(dedupWithEventTime)(
+      AddData(inputData, 1, 5, 10, 15),
+
+      // op1 W (0, 0)
+      // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // output: None
+      // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // op2 W (0, 0)
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 15)
+      // agg: None
+      // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
+      // state: [15, 20) 1
+      // op2 W (0, 15)
+      // output: (5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)
+      // state: None - trimmed by watermark
+
+      CheckNewAnswer((5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
+
+  // TODO: This test should be modified or deleted after
+  // we support stream-stream join followed by aggregation
+  test("join on time interval -> window agg, append mode, should fail") {
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[(Int, Int)]
+    val inputDF2 = input2.toDS().toDF("start", "end")
+      .withColumn("eventTime2Start", timestamp_seconds($"start"))
+      .withColumn("eventTime2End", timestamp_seconds($"end"))
+      .withColumn("start2", timestamp_seconds($"start"))
+      .withWatermark("eventTime2Start", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2,
+      expr("eventTime1 >= eventTime2Start AND eventTime1 < eventTime2End " +
+        "AND eventTime1 = start2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+    var excpetionThrown: Boolean = false

Review Comment:
   Shall we try `intercept` instead?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The 
behavior is
    * controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {

Review Comment:
   nit: could be combined with above line now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to