[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
alex-balikov commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1023155184 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala: ## @@ -940,22 +1056,22 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { def assertPassOnGlobalWatermarkLimit( testNamePostfix: String, plan: LogicalPlan, - outputMode: OutputMode): Unit = { -testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = false) + outputMode: OutputMode = OutputMode.Append()): Unit = { +testGlobalWatermarkLimit(testNamePostfix, plan, expectFailure = false, outputMode) } def assertFailOnGlobalWatermarkLimit( testNamePostfix: String, plan: LogicalPlan, - outputMode: OutputMode): Unit = { -testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = true) + outputMode: OutputMode = OutputMode.Append()): Unit = { Review Comment: ditto ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,41 +42,70 @@ object UnsupportedOperationChecker extends Logging { } } + private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + private def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + private def hasEventTimeCol(exps: Expression): Boolean = +exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + + private def isStatefulOperationPossiblyEmitLateRows( + p: LogicalPlan, outputMode: OutputMode): Boolean = p match { +case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => + left.isStreaming && right.isStreaming && +otherCondition.isDefined && hasRangeExprAgainstEventTimeCol(otherCondition.get) +// FlatMapGroupsWithState configured with event time +case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, _, _, _, _) + if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true +case p @ FlatMapGroupsInPandasWithState(_, _, _, _, _, timeout, _) + if p.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true +case a: Aggregate if a.isStreaming && outputMode != InternalOutputModes.Append => true +// Since the Distinct node will be replaced to Aggregate in the optimizer rule +// [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by +// assuming it as Aggregate. +case d @ Distinct(_: LogicalPlan) if d.isStreaming + && outputMode != InternalOutputModes.Append => true +case _ => false + } + + private def isStreamingStatefulOperation(p: LogicalPlan): Boolean = p match { +case s: Aggregate if s.isStreaming => true +// Since the Distinct node will be replaced to Aggregate in the optimizer rule +// [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by +// assuming it as Aggregate. +case d @ Distinct(_: LogicalPlan) if d.isStreaming => true +case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true +case f: FlatMapGroupsWithState if f.isStreaming => true +case f: FlatMapGroupsInPandasWithState if f.isStreaming => true +// Deduplicate also works without event time column even in streaming, +// in such cases, although Dedup is still a stateful operation in a streaming +// query, it could be ignored in all checks below, so let it return false. +case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => true +case _ => false + } /** * Checks for possible correctness issue in chained stateful operators. The behavior is * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just * print a warning message. */ - def checkStreamingQueryGlobalWatermarkLimit( - plan: LogicalPlan, - outputMode: OutputMode): Unit = { -def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { - case s: Aggregate -if s.isStreaming && outputMode == InternalOutputModes.Append => true - case Join(left, right, joinType, _, _) -if left.isStreaming && right.isStreaming && joinType != Inner => true - case f: FlatMapGroupsWithState -if f.isStreaming && f.outputMode == OutputMode.Append() => true - case _ => false -} - -def isStatefulOperation(p: LogicalPlan): Boolean = p match { - case
[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
alex-balikov commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1017413562 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: hmm, the above scenario dropDuplicates -> aggregation was supported before. So I was wrong - dropDuplicates and also inner join with timestamp equality condition can be followed by a stateful operator in any mode - these operators - dropDuplicates and inner equality join do not delay the output records. I apologize for the randomization, I think the above scenario is important to continue supporting. ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -215,20 +220,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Complete)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("a" -> 3L, "b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default +val exp = intercept[AnalysisException] { Review Comment: same here - the above scenario should work ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala: ## @@ -188,17 +194,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { expectedMsgs = Seq("Complete")) // FlatMapGroupsWithState(Update) in streaming with aggregation - for (outputMode <- Seq(Append, Update, Complete)) { + for (outputMode <- Seq(Update, Complete)) { assertNotSupportedInStreamingPlan( "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " + s"with aggregation in $outputMode mode", TestFlatMapGroupsWithState( null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null, Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)), outputMode = outputMode, - expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with aggregation")) + expectedMsgs = Seq("Multiple stateful operators", "Update", "Complete")) } + assertNotSupportedInStreamingPlan( +"flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " + + s"with aggregation in Append mode", +TestFlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null, Review Comment: I am not sure what is the semantics of setting output mode to Update on flatMapGroupsWithState but it does not match the Append output mode below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
alex-balikov commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1014428293 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -157,10 +172,11 @@ object UnsupportedOperationChecker extends Logging { // Disallow multiple streaming aggregations val aggregates = collectStreamingAggregates(plan) -if (aggregates.size > 1) { +if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) { throwError( "Multiple streaming aggregations are not supported with " + Review Comment: and we need to change the message accordingly - 'aggregations' -> 'stateful operators' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
alex-balikov commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1014425174 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -157,10 +193,11 @@ object UnsupportedOperationChecker extends Logging { // Disallow multiple streaming aggregations val aggregates = collectStreamingAggregates(plan) -if (aggregates.size > 1) { +if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) { Review Comment: Absolutely. I agree that we should allow multiple stateful ops only in append mode. The other modes are not implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org