[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-15 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1023563189


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -42,40 +43,101 @@ object UnsupportedOperationChecker extends Logging {
   }
 
   /**
-   * Checks for possible correctness issue in chained stateful operators. The 
behavior is
-   * controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
-   * Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
-   * print a warning message.
+   * Checks if the expression has a event time column
+   * @param exp the expression to be checked
+   * @return true if it is a event time column.
*/
-  def checkStreamingQueryGlobalWatermarkLimit(
-  plan: LogicalPlan,
-  outputMode: OutputMode): Unit = {
-def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-  case s: Aggregate
-if s.isStreaming && outputMode == InternalOutputModes.Append => true
-  case Join(left, right, joinType, _, _)
-if left.isStreaming && right.isStreaming && joinType != Inner => true
-  case f: FlatMapGroupsWithState
-if f.isStreaming && f.outputMode == OutputMode.Append() => true
-  case _ => false
+  private def hasEventTimeCol(exp: Expression): Boolean = exp.exists {
+case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+case _ => false
+  }
+
+  /**
+   * Checks if the expression contains a range comparison, in which
+   * either side of the comparison is an event-time column. This is used for 
checking
+   * stream-stream time interval join.
+   * @param e the expression to be checked
+   * @return true if there is a time-interval join.
+   */
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = {
+def hasEventTimeColBinaryComp(neq: Expression): Boolean = {
+  val exp = neq.asInstanceOf[BinaryComparison]
+  hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
 }
 
-def isStatefulOperation(p: LogicalPlan): Boolean = p match {
-  case s: Aggregate if s.isStreaming => true
-  case _ @ Join(left, right, _, _, _) if left.isStreaming && 
right.isStreaming => true
-  case f: FlatMapGroupsWithState if f.isStreaming => true
-  case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
-  case d: Deduplicate if d.isStreaming => true
+e.exists {
+  case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | 
_: GreaterThan) =>
+hasEventTimeColBinaryComp(neq)
   case _ => false
 }
+  }
 
-val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled
+  /**
+   * This method, combined with isStatefulOperationPossiblyEmitLateRows, 
determines all disallowed
+   * behaviors in multiple stateful operators.
+   * Concretely, All conditions defined below cannot be followed by any 
streaming stateful
+   * operator as defined in isStatefulOperationPossiblyEmitLateRows.
+   * @param p logical plan to be checked
+   * @param outputMode query output mode
+   * @return true if it is not allowed when followed by any streaming stateful
+   * operator as defined in isStatefulOperationPossiblyEmitLateRows.
+   */
+  private def ifCannotBeFollowedByStatefulOperation(
+  p: LogicalPlan, outputMode: OutputMode): Boolean = p match {
+case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+  left.isStreaming && right.isStreaming &&
+otherCondition.isDefined && 
hasRangeExprAgainstEventTimeCol(otherCondition.get)
+// FlatMapGroupsWithState configured with event time
+case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, 
_, _, _, _)
+  if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+case p @ FlatMapGroupsInPandasWithState(_, _, _, _, _, timeout, _)
+  if p.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+case a: Aggregate if a.isStreaming && outputMode != 
InternalOutputModes.Append => true
+// Since the Distinct node will be replaced to Aggregate in the optimizer 
rule
+// [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
+// assuming it as Aggregate.
+case d @ Distinct(_: LogicalPlan) if d.isStreaming
+  && outputMode != InternalOutputModes.Append => true
+case _ => false
+  }
 
+  /**
+   * This method is only used with ifCannotBeFollowedByStatefulOperation.
+   * As can tell from the name, it doesn't contain ALL streaming stateful 
operations,
+   * only the stateful operations that are possible to emit late rows.
+   * for example, a Deduplicate without a event time column is still a 
stateful operation
+   * but of less interested because it won't emit late records because of 
watermark.
+   * @param p the logical plan to be checked
+   * @return 

[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-15 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1023563189


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -42,40 +43,101 @@ object UnsupportedOperationChecker extends Logging {
   }
 
   /**
-   * Checks for possible correctness issue in chained stateful operators. The 
behavior is
-   * controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
-   * Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
-   * print a warning message.
+   * Checks if the expression has a event time column
+   * @param exp the expression to be checked
+   * @return true if it is a event time column.
*/
-  def checkStreamingQueryGlobalWatermarkLimit(
-  plan: LogicalPlan,
-  outputMode: OutputMode): Unit = {
-def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-  case s: Aggregate
-if s.isStreaming && outputMode == InternalOutputModes.Append => true
-  case Join(left, right, joinType, _, _)
-if left.isStreaming && right.isStreaming && joinType != Inner => true
-  case f: FlatMapGroupsWithState
-if f.isStreaming && f.outputMode == OutputMode.Append() => true
-  case _ => false
+  private def hasEventTimeCol(exp: Expression): Boolean = exp.exists {
+case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+case _ => false
+  }
+
+  /**
+   * Checks if the expression contains a range comparison, in which
+   * either side of the comparison is an event-time column. This is used for 
checking
+   * stream-stream time interval join.
+   * @param e the expression to be checked
+   * @return true if there is a time-interval join.
+   */
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = {
+def hasEventTimeColBinaryComp(neq: Expression): Boolean = {
+  val exp = neq.asInstanceOf[BinaryComparison]
+  hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
 }
 
-def isStatefulOperation(p: LogicalPlan): Boolean = p match {
-  case s: Aggregate if s.isStreaming => true
-  case _ @ Join(left, right, _, _, _) if left.isStreaming && 
right.isStreaming => true
-  case f: FlatMapGroupsWithState if f.isStreaming => true
-  case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
-  case d: Deduplicate if d.isStreaming => true
+e.exists {
+  case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | 
_: GreaterThan) =>
+hasEventTimeColBinaryComp(neq)
   case _ => false
 }
+  }
 
-val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled
+  /**
+   * This method, combined with isStatefulOperationPossiblyEmitLateRows, 
determines all disallowed
+   * behaviors in multiple stateful operators.
+   * Concretely, All conditions defined below cannot be followed by any 
streaming stateful
+   * operator as defined in isStatefulOperationPossiblyEmitLateRows.
+   * @param p logical plan to be checked
+   * @param outputMode query output mode
+   * @return true if it is not allowed when followed by any streaming stateful
+   * operator as defined in isStatefulOperationPossiblyEmitLateRows.
+   */
+  private def ifCannotBeFollowedByStatefulOperation(
+  p: LogicalPlan, outputMode: OutputMode): Boolean = p match {
+case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+  left.isStreaming && right.isStreaming &&
+otherCondition.isDefined && 
hasRangeExprAgainstEventTimeCol(otherCondition.get)
+// FlatMapGroupsWithState configured with event time
+case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, 
_, _, _, _)
+  if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+case p @ FlatMapGroupsInPandasWithState(_, _, _, _, _, timeout, _)
+  if p.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+case a: Aggregate if a.isStreaming && outputMode != 
InternalOutputModes.Append => true
+// Since the Distinct node will be replaced to Aggregate in the optimizer 
rule
+// [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
+// assuming it as Aggregate.
+case d @ Distinct(_: LogicalPlan) if d.isStreaming
+  && outputMode != InternalOutputModes.Append => true
+case _ => false
+  }
 
+  /**
+   * This method is only used with ifCannotBeFollowedByStatefulOperation.
+   * As can tell from the name, it doesn't contain ALL streaming stateful 
operations,
+   * only the stateful operations that are possible to emit late rows.
+   * for example, a Deduplicate without a event time column is still a 
stateful operation
+   * but of less interested because it won't emit late records because of 
watermark.
+   * @param p the logical plan to be checked
+   * @return 

[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   So every stateful operators have its own characteristics...
   
   1. streaming aggregation
   
   - append mode requires event time column to be set in grouping key. It will 
emit records which watermark passed by the value of event time column. 
(delaying the output records in above comment)
   - For update mode and complete mode, having event time column in grouping 
key is optional. It only helps to evict the state. (In complete mode this even 
doesn't happen.) It won't delay the output records.
   - For update mode and complete mode, downstream operator(s) followed by 
streaming aggregation must handle the outputs streaming aggregation produces 
semantically properly. E.g. update mode will produce outputs multiple times for 
the same aggregated output, say, correction. complete mode will produce all the 
historical aggregated outputs.
   - I can't imagine the valid case for update mode and complete mode of 
streaming aggregation to be followed by another stateful operator.
   
   2. deduplication
   
   - The behavior is same among all output modes. (More clearly, it "ignores" 
the output mode.)
   - It won't delay the output records.
   - It produces the same output only once.
   - This does not require event time column to be set. It's only used for 
eviction of the state rows.
   
   So, deduplication operator itself does not have any compatibility 
limitation. If the combination of stateful operator A and deduplication 
operator should be blocked, it is due to the operator A.
   
   3. stream-stream join
   
   - I cannot reason about the proper behavior for update and complete mode.
   - It only accepts append mode now, so good to retain it.
   - For the equality join, inner join won't delay the outputs whereas outer 
join is still able to delay the outputs.
   - For the time interval join, it can delay the output records in both inner 
and outer. 
   
   4. flatMapGroupsWithState
   
   - It doesn't support complete mode at all.
   - It supports append mode and update mode, although it's purely relying on 
user function to do the right thing for the output mode, which I believe most 
of regular end users wouldn't do.
   - It loses the event time column on output of the operator.
   - It supports processing time semantic, which IMHO we should never allow 
this to be used with event time semantic in other stateful operators.
   
   That said, maybe we still need a hand-made enumeration of allow/block list 
for update/complete mode...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   So every stateful operators have its own characteristics...
   
   1. streaming aggregation
   
   - append mode requires event time column to be set in grouping key. It will 
emit records which watermark passed by the value of event time column. 
(delaying the output records in above comment)
   - For update mode and complete mode, having event time column in grouping 
key is optional. It only helps to evict the state. (In complete mode this even 
doesn't happen.) It won't delay the output records.
   - For update mode and complete mode, downstream operator(s) followed by 
streaming aggregation must handle the outputs streaming aggregation produces 
semantically properly. E.g. update mode will produce outputs multiple times for 
the same aggregated output, say, correction. complete mode will produce all the 
historical aggregated outputs.
   - I can't imagine the valid case for update mode and complete mode of 
streaming aggregation to be followed by another stateful operator.
   
   2. deduplication
   
   - The behavior is same among all output modes. (More clearly, it "ignores" 
the output mode.)
   - It won't delay the output records.
   - It produces the same output only once.
   - This does not require event time column to be set. It's only used for 
eviction.
   
   So, deduplication operator itself does not have any compatibility 
limitation. If the combination of stateful operator A and deduplication 
operator should be blocked, it is due to the operator A.
   
   3. stream-stream join
   
   - I cannot reason about the proper behavior for update and complete mode.
   - It only accepts append mode now, so good to retain it.
   - For the equality join, inner join won't delay the outputs whereas outer 
join is still able to delay the outputs.
   - For the time interval join, it can delay the output records in both inner 
and outer. 
   
   4. flatMapGroupsWithState
   
   - It doesn't support complete mode at all.
   - It supports append mode and update mode, although it's purely relying on 
user function to do the right thing for the output mode, which I believe most 
of regular end users wouldn't do.
   - It loses the event time column on output of the operator.
   - It supports processing time semantic, which IMHO we should never allow 
this to be used with event time semantic in other stateful operators.
   
   That said, maybe we still need a hand-made enumeration of allow/block list 
for update/complete mode...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   So every stateful operators have its own characteristics...
   
   1. streaming aggregation
   
   - append mode requires event time column to be set in grouping key. It will 
emit records which watermark passed by the value of event time column. 
(delaying the output records in above comment)
   - For update mode and complete mode, having event time column in grouping 
key is optional. It only helps to evict the state. (In complete mode this even 
doesn't happen.) It won't delay the output records.
   - For update mode and complete mode, downstream operator(s) followed by 
streaming aggregation must handle the outputs streaming aggregation produces 
semantically properly. E.g. update mode will produce outputs multiple times for 
the same aggregated output, say, correction. complete mode will produce all the 
historical aggregated outputs.
   - I can't imagine the valid case for update mode and complete mode of 
streaming aggregation to be followed by another stateful operator.
   
   2. deduplication
   
   - The behavior is same among all output modes. (More clearly, it "ignores" 
the output mode.)
   - It won't delay the output records.
   - It produces the same output only once.
   
   So, deduplication operator itself does not have any compatibility 
limitation. If the combination of stateful operator A and deduplication 
operator should be blocked, it is due to the operator A.
   
   3. stream-stream join
   
   - I cannot reason about the proper behavior for update and complete mode.
   - It only accepts append mode now, so good to retain it.
   - For the equality join, inner join won't delay the outputs whereas outer 
join is still able to delay the outputs.
   - For the time interval join, it can delay the output records in both inner 
and outer. 
   
   4. flatMapGroupsWithState
   
   - It doesn't support complete mode at all.
   - It supports append mode and update mode, although it's purely relying on 
user function to do the right thing for the output mode, which I believe most 
of regular end users wouldn't do.
   - It loses the event time column on output of the operator.
   - It supports processing time semantic, which IMHO we should never allow 
this to be used with event time semantic in other stateful operators.
   
   That said, maybe we still need a hand-made enumeration of allow/block list 
for update/complete mode...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   So every stateful operators have its own characteristics...
   
   1. streaming aggregation
   
   - append mode requires event time column to be set in grouping key. It will 
emit records which watermark passed by the value of event time column. 
(delaying the output records in above comment)
   - For update mode and complete mode, having event time column in grouping 
key is optional. It only helps to evict the state. (In complete mode this even 
doesn't happen.) It won't delay the output records.
   - For update mode and complete mode, downstream operator(s) followed by 
streaming aggregation must handle the outputs streaming aggregation produces 
semantically properly. E.g. update mode will produce outputs multiple times for 
the same aggregated output, say, correction. complete mode will produce all the 
historical aggregated outputs.
   - I can't imagine the valid case for update mode and complete mode of 
streaming aggregation to be followed by another stateful operator.
   
   2. deduplication
   
   - The behavior is same among all output modes. (More clearly, it "ignores" 
the output mode.)
   - It won't delay the output records.
   - It produces the same output only once.
   
   So, deduplication operator itself does not have any compatibility 
limitation. If the combination of stateful operator A and deduplication 
operator should be blocked, it is due to the operator A.
   
   3. stream-stream join
   
   - I cannot reason about the proper behavior for update and complete mode.
   - It only accepts append mode now, so good to retain it.
   - It can delay the output records in both inner and outer in terms of time 
interval join. For the equality join, inner join won't delay the outputs 
whereas outer join is still able to delay the outputs.
   
   4. flatMapGroupsWithState
   
   - It doesn't support complete mode at all.
   - It supports append mode and update mode, although it's purely relying on 
user function to do the right thing for the output mode, which I believe most 
of regular end users wouldn't do.
   - It loses the event time column on output of the operator.
   - It supports processing time semantic, which IMHO we should never allow 
this to be used with event time semantic in other stateful operators.
   
   That said, maybe we still need a hand-made enumeration of allow/block list 
for update/complete mode...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021196487


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   If we sort this out properly, then we probably could come up with a couple 
of (or several) golden rules from here e.g. no stateful operator is supported 
after stateful operator A in some output mode(s). The list of rules has to 
still be a hand-made one though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   So every stateful operators have its own characteristics...
   
   1. streaming aggregation
   
   - append mode requires event time column to be set in grouping key. It will 
emit records which watermark passed by the value of event time column. 
(delaying the output records in above comment)
   - For update mode and complete mode, having event time column in grouping 
key is optional. It only helps to evict the state. (In complete mode this even 
doesn't happen.) It won't delay the output records.
   - For update mode and complete mode, downstream operator(s) followed by 
streaming aggregation must handle the outputs streaming aggregation produces 
semantically properly. E.g. update mode will produce outputs multiple times for 
the same aggregated output, say, correction. complete mode will produce all the 
historical aggregated outputs.
   - I can't imagine the valid case for update mode and complete mode of 
streaming aggregation to be followed by another stateful operator.
   
   2. deduplication
   
   - The behavior is same among all output modes. (More clearly, it "ignores" 
the output mode.)
   - It won't delay the output records.
   - It produces the same output only once.
   
   So, deduplication operator itself does not have any compatibility 
limitation. If the combination of stateful operator A and deduplication 
operator should be blocked, it is due to the operator A.
   
   3. stream-stream join
   
   - I cannot reason about the proper behavior for update and complete mode.
   - It only accepts append mode now, so good to retain it.
   
   4. flatMapGroupsWithState
   
   - It doesn't support complete mode at all.
   - It supports append mode and update mode, although it's purely relying on 
user function to do the right thing for the output mode, which I believe most 
of regular end users wouldn't do.
   - It loses the event time column on output of the operator.
   - It supports processing time semantic, which IMHO we should never allow 
this to be used with event time semantic in other stateful operators.
   
   That said, maybe we still need a hand-made enumeration of allow/block list 
for update/complete mode...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021175378


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala:
##
@@ -240,25 +240,30 @@ class FlatMapGroupsInPandasWithStateSuite extends 
StateStoreMetricsTest {
 .groupBy("key")
 .count()
 
-testStream(result, Complete)(
-  AddData(inputData, "a"),
-  CheckNewAnswer(("a", 1)),
-  AddData(inputData, "a", "b"),
-  // mapGroups generates ("a", "2"), ("b", "1"); so increases counts of a 
and b by 1
-  CheckNewAnswer(("a", 2), ("b", 1)),
-  StopStream,
-  StartStream(),
-  AddData(inputData, "a", "b"),
-  // mapGroups should remove state for "a" and generate ("a", "-1"), ("b", 
"2") ;
-  // so increment a and b by 1
-  CheckNewAnswer(("a", 3), ("b", 2)),
-  StopStream,
-  StartStream(),
-  AddData(inputData, "a", "c"),
-  // mapGroups should recreate state for "a" and generate ("a", "1"), 
("c", "1") ;
-  // so increment a and c by 1
-  CheckNewAnswer(("a", 4), ("b", 2), ("c", 1))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default
+val exp = intercept[AnalysisException] {

Review Comment:
   In update and complete mode, streaming aggregation does not require event 
time column to be a part of grouping key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-13 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021172211


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala:
##
@@ -240,25 +240,30 @@ class FlatMapGroupsInPandasWithStateSuite extends 
StateStoreMetricsTest {
 .groupBy("key")
 .count()
 
-testStream(result, Complete)(
-  AddData(inputData, "a"),
-  CheckNewAnswer(("a", 1)),
-  AddData(inputData, "a", "b"),
-  // mapGroups generates ("a", "2"), ("b", "1"); so increases counts of a 
and b by 1
-  CheckNewAnswer(("a", 2), ("b", 1)),
-  StopStream,
-  StartStream(),
-  AddData(inputData, "a", "b"),
-  // mapGroups should remove state for "a" and generate ("a", "-1"), ("b", 
"2") ;
-  // so increment a and b by 1
-  CheckNewAnswer(("a", 3), ("b", 2)),
-  StopStream,
-  StartStream(),
-  AddData(inputData, "a", "c"),
-  // mapGroups should recreate state for "a" and generate ("a", "1"), 
("c", "1") ;
-  // so increment a and c by 1
-  CheckNewAnswer(("a", 4), ("b", 2), ("c", 1))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default
+val exp = intercept[AnalysisException] {

Review Comment:
   (DISCLAIMER: I don't like complete mode which complicates things.)
   
   See aggregation closely - it does not leverage event time, hence it doesn't 
trigger technical limit of the output of flatMapGroupsWithState.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-13 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021160270


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##
@@ -188,17 +194,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
 expectedMsgs = Seq("Complete"))
 
   // FlatMapGroupsWithState(Update) in streaming with aggregation
-  for (outputMode <- Seq(Append, Update, Complete)) {
+  for (outputMode <- Seq(Update, Complete)) {
 assertNotSupportedInStreamingPlan(
   "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming 
relation " +
 s"with aggregation in $outputMode mode",
   TestFlatMapGroupsWithState(
 null, att, att, Seq(att), Seq(att), att, null, Update, 
isMapGroupsWithState = false, null,
 Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
   outputMode = outputMode,
-  expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with 
aggregation"))
+  expectedMsgs = Seq("Multiple stateful operators", "Update", "Complete"))
   }
 
+  assertNotSupportedInStreamingPlan(
+"flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming 
relation " +
+  s"with aggregation in Append mode",
+TestFlatMapGroupsWithState(
+  null, att, att, Seq(att), Seq(att), att, null, Update, 
isMapGroupsWithState = false, null,

Review Comment:
   This is one of the reason I want to see the allowed operations be more 
restrictive rather than saying op A - op B is supported with some modes, op B - 
op A is supported with some modes, etc. 
   
   The major reason I blocked multiple stateful operators at all was that we 
figured out stream-stream outer join followed by another stream-stream outer 
join is not working, and I realized that is not only the case. Similar risks 
are everywhere and it's really hard to think hard on all matrix.
   
   That said, unfortunately, the case we do not fail in unsupported operation 
checker does not always mean we guarantee the case to work properly. We 
probably have checked in some cases, but I don't believe it's backed by theory 
(otherwise we should have known the limit of global watermark much earlier).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-07 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016181346


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -169,16 +179,24 @@ object UnsupportedOperationChecker extends Logging {
   "DataFrames/Datasets")(plan)
 }
 
-// Disallow multiple streaming aggregations
-val aggregates = collectStreamingAggregates(plan)
+val statefulOps = plan.collect {
+  case p: LogicalPlan if isStatefulOperation(p) => p
+}
 
-if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {
+if (statefulOps.size > 1 &&
+  outputMode != InternalOutputModes.Append &&
+  SQLConf.get.statefulOperatorCorrectnessCheckEnabled) {

Review Comment:
   I think this configuration should not just exist now as we unlocked various 
use cases (except stream-stream time interval join followed by other stateful 
operator), but let's handle it as separate issue. (I'll file a JIRA ticket on 
it.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-07 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016177874


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,34 +42,58 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = 
e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  private def hasEventTimeCol(exps: Expression): Boolean =
+exps.exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+
+  /**
+   * Check if the given logical plan is a streaming stateful operations.
+   * @param p: The logical plan to be checked.
+   */
+  def isStatefulOperation(p: LogicalPlan): Boolean = {
+p match {
+  case s: Aggregate if s.isStreaming => true
+  // Since the Distinct node will be replaced to Aggregate in the 
optimizer rule
+  // [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
+  // assuming it as Aggregate.
+  case d @ Distinct(_: LogicalPlan) if d.isStreaming => true

Review Comment:
   Yeah, nice finding!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-07 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016177560


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -169,16 +179,24 @@ object UnsupportedOperationChecker extends Logging {
   "DataFrames/Datasets")(plan)
 }
 
-// Disallow multiple streaming aggregations
-val aggregates = collectStreamingAggregates(plan)
+val statefulOps = plan.collect {
+  case p: LogicalPlan if isStatefulOperation(p) => p
+}
 
-if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {
+if (statefulOps.size > 1 &&
+  outputMode != InternalOutputModes.Append &&

Review Comment:
   nit: indent this line and next line (2 spaces more)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-06 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014774005


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -157,10 +172,11 @@ object UnsupportedOperationChecker extends Logging {
 // Disallow multiple streaming aggregations
 val aggregates = collectStreamingAggregates(plan)
 
-if (aggregates.size > 1) {
+if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {

Review Comment:
   @WweiL You still need to change above to collect all stateful operators 
rather than only streaming aggregates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-04 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1013649051


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+exps.exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+plan match {
+  case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+left.isStreaming && right.isStreaming
+otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+  case _ => false
+}
+  }
+
   /**
* Checks for possible correctness issue in chained stateful operators. The 
behavior is
* controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
* Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
* print a warning message.
*/
   def checkStreamingQueryGlobalWatermarkLimit(
-  plan: LogicalPlan,
-  outputMode: OutputMode): Unit = {
+  plan: LogicalPlan): Unit = {
 def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-  case s: Aggregate
-if s.isStreaming && outputMode == InternalOutputModes.Append => true
   case Join(left, right, joinType, _, _)

Review Comment:
   We can remove this line as we support outer join as well. We only have issue 
with stream-stream time interval join (with all types) and 
flatMapGroupsWithState.
   (Arguably flatMapGroupsWithState followed by other stateful operator with 
all output modes should be disallowed, but I believe we will have a separate 
check for output mode so OK.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-04 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1013649051


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+exps.exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+plan match {
+  case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+left.isStreaming && right.isStreaming
+otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+  case _ => false
+}
+  }
+
   /**
* Checks for possible correctness issue in chained stateful operators. The 
behavior is
* controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
* Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
* print a warning message.
*/
   def checkStreamingQueryGlobalWatermarkLimit(
-  plan: LogicalPlan,
-  outputMode: OutputMode): Unit = {
+  plan: LogicalPlan): Unit = {
 def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-  case s: Aggregate
-if s.isStreaming && outputMode == InternalOutputModes.Append => true
   case Join(left, right, joinType, _, _)

Review Comment:
   We can remove this line as we support outer join as well. We only have issue 
with stream-stream time interval join (with all types) and 
flatMapGroupsWithState.
   (Arguably flatMapGroupsWithState with all output modes should be disallowed, 
but I believe we have a separate check for output mode so OK.)



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+exps.exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+plan match {
+  case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+left.isStreaming && right.isStreaming
+otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+  case _ => false
+}
+  }
+
   /**
* Checks for possible correctness issue in chained stateful operators. The 
behavior is
* controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
* Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
* print a warning message.
*/
   def checkStreamingQueryGlobalWatermarkLimit(
-  plan: LogicalPlan,
-  outputMode: OutputMode): Unit = {
+  plan: LogicalPlan): Unit = {
 def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-  case s: Aggregate
-if s.isStreaming && outputMode == InternalOutputModes.Append => true
   case Join(left, right, joinType, _, _)

Review Comment:
   We can remove this line as we support outer join as well. We only have issue 
with stream-stream time interval join (with all types) and 
flatMapGroupsWithState.
   (Arguably flatMapGroupsWithState with all output modes should be disallowed, 
but I believe we will have a separate check for output mode so OK.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-04 Thread GitBox


HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1013649051


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+exps.exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+plan match {
+  case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+left.isStreaming && right.isStreaming
+otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+  case _ => false
+}
+  }
+
   /**
* Checks for possible correctness issue in chained stateful operators. The 
behavior is
* controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
* Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
* print a warning message.
*/
   def checkStreamingQueryGlobalWatermarkLimit(
-  plan: LogicalPlan,
-  outputMode: OutputMode): Unit = {
+  plan: LogicalPlan): Unit = {
 def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-  case s: Aggregate
-if s.isStreaming && outputMode == InternalOutputModes.Append => true
   case Join(left, right, joinType, _, _)

Review Comment:
   We can remove this line as we support outer join as well. We only have issue 
with stream-stream time interval join and flatMapGroupsWithState.
   (Arguably flatMapGroupsWithState with all output modes should be disallowed, 
but I believe we have a separate check for output mode so OK.)



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+exps.exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+plan match {
+  case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+left.isStreaming && right.isStreaming

Review Comment:
   maybe missing &&? otherwise this line would be no-op



##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##
@@ -507,15 +507,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
   assertPassOnGlobalWatermarkLimit(
 s"single $joinType join in Append mode",
 streamRelation.join(streamRelation, joinType = RightOuter,
-  condition = Some(attributeWithWatermark === attribute)),

Review Comment:
   The above code comment and related tests are outdated. 
   
   For (time-window) equality join, (inner, left outer, right outer, and full 
outer) should just work with recent fix of late record filtering. For 
time-interval join, all types won't work if there is following stateful 
operator.
   
   That said, we should now have redundant test cases for time-interval join 
specifically to check the different expectation.



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) ||