[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1023563189 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -42,40 +43,101 @@ object UnsupportedOperationChecker extends Logging { } /** - * Checks for possible correctness issue in chained stateful operators. The behavior is - * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. - * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just - * print a warning message. + * Checks if the expression has a event time column + * @param exp the expression to be checked + * @return true if it is a event time column. */ - def checkStreamingQueryGlobalWatermarkLimit( - plan: LogicalPlan, - outputMode: OutputMode): Unit = { -def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { - case s: Aggregate -if s.isStreaming && outputMode == InternalOutputModes.Append => true - case Join(left, right, joinType, _, _) -if left.isStreaming && right.isStreaming && joinType != Inner => true - case f: FlatMapGroupsWithState -if f.isStreaming && f.outputMode == OutputMode.Append() => true - case _ => false + private def hasEventTimeCol(exp: Expression): Boolean = exp.exists { +case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) +case _ => false + } + + /** + * Checks if the expression contains a range comparison, in which + * either side of the comparison is an event-time column. This is used for checking + * stream-stream time interval join. + * @param e the expression to be checked + * @return true if there is a time-interval join. + */ + private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = { +def hasEventTimeColBinaryComp(neq: Expression): Boolean = { + val exp = neq.asInstanceOf[BinaryComparison] + hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) } -def isStatefulOperation(p: LogicalPlan): Boolean = p match { - case s: Aggregate if s.isStreaming => true - case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true - case f: FlatMapGroupsWithState if f.isStreaming => true - case f: FlatMapGroupsInPandasWithState if f.isStreaming => true - case d: Deduplicate if d.isStreaming => true +e.exists { + case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => +hasEventTimeColBinaryComp(neq) case _ => false } + } -val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled + /** + * This method, combined with isStatefulOperationPossiblyEmitLateRows, determines all disallowed + * behaviors in multiple stateful operators. + * Concretely, All conditions defined below cannot be followed by any streaming stateful + * operator as defined in isStatefulOperationPossiblyEmitLateRows. + * @param p logical plan to be checked + * @param outputMode query output mode + * @return true if it is not allowed when followed by any streaming stateful + * operator as defined in isStatefulOperationPossiblyEmitLateRows. + */ + private def ifCannotBeFollowedByStatefulOperation( + p: LogicalPlan, outputMode: OutputMode): Boolean = p match { +case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => + left.isStreaming && right.isStreaming && +otherCondition.isDefined && hasRangeExprAgainstEventTimeCol(otherCondition.get) +// FlatMapGroupsWithState configured with event time +case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, _, _, _, _) + if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true +case p @ FlatMapGroupsInPandasWithState(_, _, _, _, _, timeout, _) + if p.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true +case a: Aggregate if a.isStreaming && outputMode != InternalOutputModes.Append => true +// Since the Distinct node will be replaced to Aggregate in the optimizer rule +// [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by +// assuming it as Aggregate. +case d @ Distinct(_: LogicalPlan) if d.isStreaming + && outputMode != InternalOutputModes.Append => true +case _ => false + } + /** + * This method is only used with ifCannotBeFollowedByStatefulOperation. + * As can tell from the name, it doesn't contain ALL streaming stateful operations, + * only the stateful operations that are possible to emit late rows. + * for example, a Deduplicate without a event time column is still a stateful operation + * but of less interested because it won't emit late records because of watermark. + * @param p the logical plan to be checked + * @return
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1023563189 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -42,40 +43,101 @@ object UnsupportedOperationChecker extends Logging { } /** - * Checks for possible correctness issue in chained stateful operators. The behavior is - * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. - * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just - * print a warning message. + * Checks if the expression has a event time column + * @param exp the expression to be checked + * @return true if it is a event time column. */ - def checkStreamingQueryGlobalWatermarkLimit( - plan: LogicalPlan, - outputMode: OutputMode): Unit = { -def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { - case s: Aggregate -if s.isStreaming && outputMode == InternalOutputModes.Append => true - case Join(left, right, joinType, _, _) -if left.isStreaming && right.isStreaming && joinType != Inner => true - case f: FlatMapGroupsWithState -if f.isStreaming && f.outputMode == OutputMode.Append() => true - case _ => false + private def hasEventTimeCol(exp: Expression): Boolean = exp.exists { +case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) +case _ => false + } + + /** + * Checks if the expression contains a range comparison, in which + * either side of the comparison is an event-time column. This is used for checking + * stream-stream time interval join. + * @param e the expression to be checked + * @return true if there is a time-interval join. + */ + private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = { +def hasEventTimeColBinaryComp(neq: Expression): Boolean = { + val exp = neq.asInstanceOf[BinaryComparison] + hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) } -def isStatefulOperation(p: LogicalPlan): Boolean = p match { - case s: Aggregate if s.isStreaming => true - case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true - case f: FlatMapGroupsWithState if f.isStreaming => true - case f: FlatMapGroupsInPandasWithState if f.isStreaming => true - case d: Deduplicate if d.isStreaming => true +e.exists { + case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => +hasEventTimeColBinaryComp(neq) case _ => false } + } -val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled + /** + * This method, combined with isStatefulOperationPossiblyEmitLateRows, determines all disallowed + * behaviors in multiple stateful operators. + * Concretely, All conditions defined below cannot be followed by any streaming stateful + * operator as defined in isStatefulOperationPossiblyEmitLateRows. + * @param p logical plan to be checked + * @param outputMode query output mode + * @return true if it is not allowed when followed by any streaming stateful + * operator as defined in isStatefulOperationPossiblyEmitLateRows. + */ + private def ifCannotBeFollowedByStatefulOperation( + p: LogicalPlan, outputMode: OutputMode): Boolean = p match { +case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => + left.isStreaming && right.isStreaming && +otherCondition.isDefined && hasRangeExprAgainstEventTimeCol(otherCondition.get) +// FlatMapGroupsWithState configured with event time +case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, _, _, _, _) + if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true +case p @ FlatMapGroupsInPandasWithState(_, _, _, _, _, timeout, _) + if p.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true +case a: Aggregate if a.isStreaming && outputMode != InternalOutputModes.Append => true +// Since the Distinct node will be replaced to Aggregate in the optimizer rule +// [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by +// assuming it as Aggregate. +case d @ Distinct(_: LogicalPlan) if d.isStreaming + && outputMode != InternalOutputModes.Append => true +case _ => false + } + /** + * This method is only used with ifCannotBeFollowedByStatefulOperation. + * As can tell from the name, it doesn't contain ALL streaming stateful operations, + * only the stateful operations that are possible to emit late rows. + * for example, a Deduplicate without a event time column is still a stateful operation + * but of less interested because it won't emit late records because of watermark. + * @param p the logical plan to be checked + * @return
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: So every stateful operators have its own characteristics... 1. streaming aggregation - append mode requires event time column to be set in grouping key. It will emit records which watermark passed by the value of event time column. (delaying the output records in above comment) - For update mode and complete mode, having event time column in grouping key is optional. It only helps to evict the state. (In complete mode this even doesn't happen.) It won't delay the output records. - For update mode and complete mode, downstream operator(s) followed by streaming aggregation must handle the outputs streaming aggregation produces semantically properly. E.g. update mode will produce outputs multiple times for the same aggregated output, say, correction. complete mode will produce all the historical aggregated outputs. - I can't imagine the valid case for update mode and complete mode of streaming aggregation to be followed by another stateful operator. 2. deduplication - The behavior is same among all output modes. (More clearly, it "ignores" the output mode.) - It won't delay the output records. - It produces the same output only once. - This does not require event time column to be set. It's only used for eviction of the state rows. So, deduplication operator itself does not have any compatibility limitation. If the combination of stateful operator A and deduplication operator should be blocked, it is due to the operator A. 3. stream-stream join - I cannot reason about the proper behavior for update and complete mode. - It only accepts append mode now, so good to retain it. - For the equality join, inner join won't delay the outputs whereas outer join is still able to delay the outputs. - For the time interval join, it can delay the output records in both inner and outer. 4. flatMapGroupsWithState - It doesn't support complete mode at all. - It supports append mode and update mode, although it's purely relying on user function to do the right thing for the output mode, which I believe most of regular end users wouldn't do. - It loses the event time column on output of the operator. - It supports processing time semantic, which IMHO we should never allow this to be used with event time semantic in other stateful operators. That said, maybe we still need a hand-made enumeration of allow/block list for update/complete mode... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: So every stateful operators have its own characteristics... 1. streaming aggregation - append mode requires event time column to be set in grouping key. It will emit records which watermark passed by the value of event time column. (delaying the output records in above comment) - For update mode and complete mode, having event time column in grouping key is optional. It only helps to evict the state. (In complete mode this even doesn't happen.) It won't delay the output records. - For update mode and complete mode, downstream operator(s) followed by streaming aggregation must handle the outputs streaming aggregation produces semantically properly. E.g. update mode will produce outputs multiple times for the same aggregated output, say, correction. complete mode will produce all the historical aggregated outputs. - I can't imagine the valid case for update mode and complete mode of streaming aggregation to be followed by another stateful operator. 2. deduplication - The behavior is same among all output modes. (More clearly, it "ignores" the output mode.) - It won't delay the output records. - It produces the same output only once. - This does not require event time column to be set. It's only used for eviction. So, deduplication operator itself does not have any compatibility limitation. If the combination of stateful operator A and deduplication operator should be blocked, it is due to the operator A. 3. stream-stream join - I cannot reason about the proper behavior for update and complete mode. - It only accepts append mode now, so good to retain it. - For the equality join, inner join won't delay the outputs whereas outer join is still able to delay the outputs. - For the time interval join, it can delay the output records in both inner and outer. 4. flatMapGroupsWithState - It doesn't support complete mode at all. - It supports append mode and update mode, although it's purely relying on user function to do the right thing for the output mode, which I believe most of regular end users wouldn't do. - It loses the event time column on output of the operator. - It supports processing time semantic, which IMHO we should never allow this to be used with event time semantic in other stateful operators. That said, maybe we still need a hand-made enumeration of allow/block list for update/complete mode... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: So every stateful operators have its own characteristics... 1. streaming aggregation - append mode requires event time column to be set in grouping key. It will emit records which watermark passed by the value of event time column. (delaying the output records in above comment) - For update mode and complete mode, having event time column in grouping key is optional. It only helps to evict the state. (In complete mode this even doesn't happen.) It won't delay the output records. - For update mode and complete mode, downstream operator(s) followed by streaming aggregation must handle the outputs streaming aggregation produces semantically properly. E.g. update mode will produce outputs multiple times for the same aggregated output, say, correction. complete mode will produce all the historical aggregated outputs. - I can't imagine the valid case for update mode and complete mode of streaming aggregation to be followed by another stateful operator. 2. deduplication - The behavior is same among all output modes. (More clearly, it "ignores" the output mode.) - It won't delay the output records. - It produces the same output only once. So, deduplication operator itself does not have any compatibility limitation. If the combination of stateful operator A and deduplication operator should be blocked, it is due to the operator A. 3. stream-stream join - I cannot reason about the proper behavior for update and complete mode. - It only accepts append mode now, so good to retain it. - For the equality join, inner join won't delay the outputs whereas outer join is still able to delay the outputs. - For the time interval join, it can delay the output records in both inner and outer. 4. flatMapGroupsWithState - It doesn't support complete mode at all. - It supports append mode and update mode, although it's purely relying on user function to do the right thing for the output mode, which I believe most of regular end users wouldn't do. - It loses the event time column on output of the operator. - It supports processing time semantic, which IMHO we should never allow this to be used with event time semantic in other stateful operators. That said, maybe we still need a hand-made enumeration of allow/block list for update/complete mode... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: So every stateful operators have its own characteristics... 1. streaming aggregation - append mode requires event time column to be set in grouping key. It will emit records which watermark passed by the value of event time column. (delaying the output records in above comment) - For update mode and complete mode, having event time column in grouping key is optional. It only helps to evict the state. (In complete mode this even doesn't happen.) It won't delay the output records. - For update mode and complete mode, downstream operator(s) followed by streaming aggregation must handle the outputs streaming aggregation produces semantically properly. E.g. update mode will produce outputs multiple times for the same aggregated output, say, correction. complete mode will produce all the historical aggregated outputs. - I can't imagine the valid case for update mode and complete mode of streaming aggregation to be followed by another stateful operator. 2. deduplication - The behavior is same among all output modes. (More clearly, it "ignores" the output mode.) - It won't delay the output records. - It produces the same output only once. So, deduplication operator itself does not have any compatibility limitation. If the combination of stateful operator A and deduplication operator should be blocked, it is due to the operator A. 3. stream-stream join - I cannot reason about the proper behavior for update and complete mode. - It only accepts append mode now, so good to retain it. - It can delay the output records in both inner and outer in terms of time interval join. For the equality join, inner join won't delay the outputs whereas outer join is still able to delay the outputs. 4. flatMapGroupsWithState - It doesn't support complete mode at all. - It supports append mode and update mode, although it's purely relying on user function to do the right thing for the output mode, which I believe most of regular end users wouldn't do. - It loses the event time column on output of the operator. - It supports processing time semantic, which IMHO we should never allow this to be used with event time semantic in other stateful operators. That said, maybe we still need a hand-made enumeration of allow/block list for update/complete mode... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021196487 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: If we sort this out properly, then we probably could come up with a couple of (or several) golden rules from here e.g. no stateful operator is supported after stateful operator A in some output mode(s). The list of rules has to still be a hand-made one though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: So every stateful operators have its own characteristics... 1. streaming aggregation - append mode requires event time column to be set in grouping key. It will emit records which watermark passed by the value of event time column. (delaying the output records in above comment) - For update mode and complete mode, having event time column in grouping key is optional. It only helps to evict the state. (In complete mode this even doesn't happen.) It won't delay the output records. - For update mode and complete mode, downstream operator(s) followed by streaming aggregation must handle the outputs streaming aggregation produces semantically properly. E.g. update mode will produce outputs multiple times for the same aggregated output, say, correction. complete mode will produce all the historical aggregated outputs. - I can't imagine the valid case for update mode and complete mode of streaming aggregation to be followed by another stateful operator. 2. deduplication - The behavior is same among all output modes. (More clearly, it "ignores" the output mode.) - It won't delay the output records. - It produces the same output only once. So, deduplication operator itself does not have any compatibility limitation. If the combination of stateful operator A and deduplication operator should be blocked, it is due to the operator A. 3. stream-stream join - I cannot reason about the proper behavior for update and complete mode. - It only accepts append mode now, so good to retain it. 4. flatMapGroupsWithState - It doesn't support complete mode at all. - It supports append mode and update mode, although it's purely relying on user function to do the right thing for the output mode, which I believe most of regular end users wouldn't do. - It loses the event time column on output of the operator. - It supports processing time semantic, which IMHO we should never allow this to be used with event time semantic in other stateful operators. That said, maybe we still need a hand-made enumeration of allow/block list for update/complete mode... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021175378 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala: ## @@ -240,25 +240,30 @@ class FlatMapGroupsInPandasWithStateSuite extends StateStoreMetricsTest { .groupBy("key") .count() -testStream(result, Complete)( - AddData(inputData, "a"), - CheckNewAnswer(("a", 1)), - AddData(inputData, "a", "b"), - // mapGroups generates ("a", "2"), ("b", "1"); so increases counts of a and b by 1 - CheckNewAnswer(("a", 2), ("b", 1)), - StopStream, - StartStream(), - AddData(inputData, "a", "b"), - // mapGroups should remove state for "a" and generate ("a", "-1"), ("b", "2") ; - // so increment a and b by 1 - CheckNewAnswer(("a", 3), ("b", 2)), - StopStream, - StartStream(), - AddData(inputData, "a", "c"), - // mapGroups should recreate state for "a" and generate ("a", "1"), ("c", "1") ; - // so increment a and c by 1 - CheckNewAnswer(("a", 4), ("b", 2), ("c", 1)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default +val exp = intercept[AnalysisException] { Review Comment: In update and complete mode, streaming aggregation does not require event time column to be a part of grouping key. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021172211 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala: ## @@ -240,25 +240,30 @@ class FlatMapGroupsInPandasWithStateSuite extends StateStoreMetricsTest { .groupBy("key") .count() -testStream(result, Complete)( - AddData(inputData, "a"), - CheckNewAnswer(("a", 1)), - AddData(inputData, "a", "b"), - // mapGroups generates ("a", "2"), ("b", "1"); so increases counts of a and b by 1 - CheckNewAnswer(("a", 2), ("b", 1)), - StopStream, - StartStream(), - AddData(inputData, "a", "b"), - // mapGroups should remove state for "a" and generate ("a", "-1"), ("b", "2") ; - // so increment a and b by 1 - CheckNewAnswer(("a", 3), ("b", 2)), - StopStream, - StartStream(), - AddData(inputData, "a", "c"), - // mapGroups should recreate state for "a" and generate ("a", "1"), ("c", "1") ; - // so increment a and c by 1 - CheckNewAnswer(("a", 4), ("b", 2), ("c", 1)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default +val exp = intercept[AnalysisException] { Review Comment: (DISCLAIMER: I don't like complete mode which complicates things.) See aggregation closely - it does not leverage event time, hence it doesn't trigger technical limit of the output of flatMapGroupsWithState. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021160270 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala: ## @@ -188,17 +194,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { expectedMsgs = Seq("Complete")) // FlatMapGroupsWithState(Update) in streaming with aggregation - for (outputMode <- Seq(Append, Update, Complete)) { + for (outputMode <- Seq(Update, Complete)) { assertNotSupportedInStreamingPlan( "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " + s"with aggregation in $outputMode mode", TestFlatMapGroupsWithState( null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null, Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)), outputMode = outputMode, - expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with aggregation")) + expectedMsgs = Seq("Multiple stateful operators", "Update", "Complete")) } + assertNotSupportedInStreamingPlan( +"flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " + + s"with aggregation in Append mode", +TestFlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null, Review Comment: This is one of the reason I want to see the allowed operations be more restrictive rather than saying op A - op B is supported with some modes, op B - op A is supported with some modes, etc. The major reason I blocked multiple stateful operators at all was that we figured out stream-stream outer join followed by another stream-stream outer join is not working, and I realized that is not only the case. Similar risks are everywhere and it's really hard to think hard on all matrix. That said, unfortunately, the case we do not fail in unsupported operation checker does not always mean we guarantee the case to work properly. We probably have checked in some cases, but I don't believe it's backed by theory (otherwise we should have known the limit of global watermark much earlier). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1016181346 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -169,16 +179,24 @@ object UnsupportedOperationChecker extends Logging { "DataFrames/Datasets")(plan) } -// Disallow multiple streaming aggregations -val aggregates = collectStreamingAggregates(plan) +val statefulOps = plan.collect { + case p: LogicalPlan if isStatefulOperation(p) => p +} -if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) { +if (statefulOps.size > 1 && + outputMode != InternalOutputModes.Append && + SQLConf.get.statefulOperatorCorrectnessCheckEnabled) { Review Comment: I think this configuration should not just exist now as we unlocked various use cases (except stream-stream time interval join followed by other stateful operator), but let's handle it as separate issue. (I'll file a JIRA ticket on it.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1016177874 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,34 +42,58 @@ object UnsupportedOperationChecker extends Logging { } } + private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + private def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + private def hasEventTimeCol(exps: Expression): Boolean = +exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + + /** + * Check if the given logical plan is a streaming stateful operations. + * @param p: The logical plan to be checked. + */ + def isStatefulOperation(p: LogicalPlan): Boolean = { +p match { + case s: Aggregate if s.isStreaming => true + // Since the Distinct node will be replaced to Aggregate in the optimizer rule + // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by + // assuming it as Aggregate. + case d @ Distinct(_: LogicalPlan) if d.isStreaming => true Review Comment: Yeah, nice finding! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1016177560 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -169,16 +179,24 @@ object UnsupportedOperationChecker extends Logging { "DataFrames/Datasets")(plan) } -// Disallow multiple streaming aggregations -val aggregates = collectStreamingAggregates(plan) +val statefulOps = plan.collect { + case p: LogicalPlan if isStatefulOperation(p) => p +} -if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) { +if (statefulOps.size > 1 && + outputMode != InternalOutputModes.Append && Review Comment: nit: indent this line and next line (2 spaces more) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1014774005 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -157,10 +172,11 @@ object UnsupportedOperationChecker extends Logging { // Disallow multiple streaming aggregations val aggregates = collectStreamingAggregates(plan) -if (aggregates.size > 1) { +if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) { Review Comment: @WweiL You still need to change above to collect all stateful operators rather than only streaming aggregates. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1013649051 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + def hasEventTimeCol(exps: Expression): Boolean = +exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + + // TODO: This function and hasRangeExpr + // should be deleted after we support range join with states + def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = { +plan match { + case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => +left.isStreaming && right.isStreaming +otherCondition.isDefined && hasRangeExpr(otherCondition.get) + case _ => false +} + } + /** * Checks for possible correctness issue in chained stateful operators. The behavior is * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just * print a warning message. */ def checkStreamingQueryGlobalWatermarkLimit( - plan: LogicalPlan, - outputMode: OutputMode): Unit = { + plan: LogicalPlan): Unit = { def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { - case s: Aggregate -if s.isStreaming && outputMode == InternalOutputModes.Append => true case Join(left, right, joinType, _, _) Review Comment: We can remove this line as we support outer join as well. We only have issue with stream-stream time interval join (with all types) and flatMapGroupsWithState. (Arguably flatMapGroupsWithState followed by other stateful operator with all output modes should be disallowed, but I believe we will have a separate check for output mode so OK.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1013649051 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + def hasEventTimeCol(exps: Expression): Boolean = +exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + + // TODO: This function and hasRangeExpr + // should be deleted after we support range join with states + def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = { +plan match { + case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => +left.isStreaming && right.isStreaming +otherCondition.isDefined && hasRangeExpr(otherCondition.get) + case _ => false +} + } + /** * Checks for possible correctness issue in chained stateful operators. The behavior is * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just * print a warning message. */ def checkStreamingQueryGlobalWatermarkLimit( - plan: LogicalPlan, - outputMode: OutputMode): Unit = { + plan: LogicalPlan): Unit = { def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { - case s: Aggregate -if s.isStreaming && outputMode == InternalOutputModes.Append => true case Join(left, right, joinType, _, _) Review Comment: We can remove this line as we support outer join as well. We only have issue with stream-stream time interval join (with all types) and flatMapGroupsWithState. (Arguably flatMapGroupsWithState with all output modes should be disallowed, but I believe we have a separate check for output mode so OK.) ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + def hasEventTimeCol(exps: Expression): Boolean = +exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + + // TODO: This function and hasRangeExpr + // should be deleted after we support range join with states + def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = { +plan match { + case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => +left.isStreaming && right.isStreaming +otherCondition.isDefined && hasRangeExpr(otherCondition.get) + case _ => false +} + } + /** * Checks for possible correctness issue in chained stateful operators. The behavior is * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just * print a warning message. */ def checkStreamingQueryGlobalWatermarkLimit( - plan: LogicalPlan, - outputMode: OutputMode): Unit = { + plan: LogicalPlan): Unit = { def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { - case s: Aggregate -if s.isStreaming && outputMode == InternalOutputModes.Append => true case Join(left, right, joinType, _, _) Review Comment: We can remove this line as we support outer join as well. We only have issue with stream-stream time interval join (with all types) and flatMapGroupsWithState. (Arguably flatMapGroupsWithState with all output modes should be disallowed, but I believe we will have a separate check for output mode so OK.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1013649051 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + def hasEventTimeCol(exps: Expression): Boolean = +exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + + // TODO: This function and hasRangeExpr + // should be deleted after we support range join with states + def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = { +plan match { + case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => +left.isStreaming && right.isStreaming +otherCondition.isDefined && hasRangeExpr(otherCondition.get) + case _ => false +} + } + /** * Checks for possible correctness issue in chained stateful operators. The behavior is * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just * print a warning message. */ def checkStreamingQueryGlobalWatermarkLimit( - plan: LogicalPlan, - outputMode: OutputMode): Unit = { + plan: LogicalPlan): Unit = { def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { - case s: Aggregate -if s.isStreaming && outputMode == InternalOutputModes.Append => true case Join(left, right, joinType, _, _) Review Comment: We can remove this line as we support outer join as well. We only have issue with stream-stream time interval join and flatMapGroupsWithState. (Arguably flatMapGroupsWithState with all output modes should be disallowed, but I believe we have a separate check for output mode so OK.) ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + def hasEventTimeCol(exps: Expression): Boolean = +exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + + // TODO: This function and hasRangeExpr + // should be deleted after we support range join with states + def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = { +plan match { + case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => +left.isStreaming && right.isStreaming Review Comment: maybe missing &&? otherwise this line would be no-op ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala: ## @@ -507,15 +507,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { assertPassOnGlobalWatermarkLimit( s"single $joinType join in Append mode", streamRelation.join(streamRelation, joinType = RightOuter, - condition = Some(attributeWithWatermark === attribute)), Review Comment: The above code comment and related tests are outdated. For (time-window) equality join, (inner, left outer, right outer, and full outer) should just work with recent fix of late record filtering. For time-interval join, all types won't work if there is following stateful operator. That said, we should now have redundant test cases for time-interval join specifically to check the different expectation. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) ||