alex-balikov commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1023155184
########## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala: ########## @@ -940,22 +1056,22 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { def assertPassOnGlobalWatermarkLimit( testNamePostfix: String, plan: LogicalPlan, - outputMode: OutputMode): Unit = { - testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = false) + outputMode: OutputMode = OutputMode.Append()): Unit = { + testGlobalWatermarkLimit(testNamePostfix, plan, expectFailure = false, outputMode) } def assertFailOnGlobalWatermarkLimit( testNamePostfix: String, plan: LogicalPlan, - outputMode: OutputMode): Unit = { - testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = true) + outputMode: OutputMode = OutputMode.Append()): Unit = { Review Comment: ditto ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ########## @@ -41,41 +42,70 @@ object UnsupportedOperationChecker extends Logging { } } + private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists { + case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) + case _ => false + } + + private def hasEventTimeColNeq(neq: Expression): Boolean = { + val exp = neq.asInstanceOf[BinaryComparison] + hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + private def hasEventTimeCol(exps: Expression): Boolean = + exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false + } + + private def isStatefulOperationPossiblyEmitLateRows( + p: LogicalPlan, outputMode: OutputMode): Boolean = p match { + case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => + left.isStreaming && right.isStreaming && + otherCondition.isDefined && hasRangeExprAgainstEventTimeCol(otherCondition.get) + // FlatMapGroupsWithState configured with event time + case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, _, _, _, _) + if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true + case p @ FlatMapGroupsInPandasWithState(_, _, _, _, _, timeout, _) + if p.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true + case a: Aggregate if a.isStreaming && outputMode != InternalOutputModes.Append => true + // Since the Distinct node will be replaced to Aggregate in the optimizer rule + // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by + // assuming it as Aggregate. + case d @ Distinct(_: LogicalPlan) if d.isStreaming + && outputMode != InternalOutputModes.Append => true + case _ => false + } + + private def isStreamingStatefulOperation(p: LogicalPlan): Boolean = p match { + case s: Aggregate if s.isStreaming => true + // Since the Distinct node will be replaced to Aggregate in the optimizer rule + // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by + // assuming it as Aggregate. + case d @ Distinct(_: LogicalPlan) if d.isStreaming => true + case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true + case f: FlatMapGroupsWithState if f.isStreaming => true + case f: FlatMapGroupsInPandasWithState if f.isStreaming => true + // Deduplicate also works without event time column even in streaming, + // in such cases, although Dedup is still a stateful operation in a streaming + // query, it could be ignored in all checks below, so let it return false. + case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => true + case _ => false + } /** * Checks for possible correctness issue in chained stateful operators. The behavior is * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just * print a warning message. */ - def checkStreamingQueryGlobalWatermarkLimit( - plan: LogicalPlan, - outputMode: OutputMode): Unit = { - def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { - case s: Aggregate - if s.isStreaming && outputMode == InternalOutputModes.Append => true - case Join(left, right, joinType, _, _) - if left.isStreaming && right.isStreaming && joinType != Inner => true - case f: FlatMapGroupsWithState - if f.isStreaming && f.outputMode == OutputMode.Append() => true - case _ => false - } - - def isStatefulOperation(p: LogicalPlan): Boolean = p match { - case s: Aggregate if s.isStreaming => true - case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true - case f: FlatMapGroupsWithState if f.isStreaming => true - case f: FlatMapGroupsInPandasWithState if f.isStreaming => true - case d: Deduplicate if d.isStreaming => true - case _ => false - } - + def checkStreamingQueryGlobalWatermarkLimit(plan: LogicalPlan, outputMode: OutputMode): Unit = { Review Comment: a comment ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ########## @@ -41,41 +42,70 @@ object UnsupportedOperationChecker extends Logging { } } + private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists { + case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) + case _ => false + } + + private def hasEventTimeColNeq(neq: Expression): Boolean = { Review Comment: What does 'Neq' stand for here? - 'not equal'? How does this method know that that is the case - seems like it can be applied to any binary expression. Maybe add an assert? ########## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala: ########## @@ -940,22 +1056,22 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { def assertPassOnGlobalWatermarkLimit( testNamePostfix: String, plan: LogicalPlan, - outputMode: OutputMode): Unit = { - testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = false) + outputMode: OutputMode = OutputMode.Append()): Unit = { Review Comment: I would suggest not making Append a default parameter value but specifying it explicitly in each test for better readability ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ########## @@ -41,41 +42,70 @@ object UnsupportedOperationChecker extends Logging { } } + private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists { Review Comment: a short comment on the methods may be helpful ########## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala: ########## @@ -940,22 +1056,22 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { def assertPassOnGlobalWatermarkLimit( testNamePostfix: String, plan: LogicalPlan, - outputMode: OutputMode): Unit = { - testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = false) + outputMode: OutputMode = OutputMode.Append()): Unit = { + testGlobalWatermarkLimit(testNamePostfix, plan, expectFailure = false, outputMode) } def assertFailOnGlobalWatermarkLimit( testNamePostfix: String, plan: LogicalPlan, - outputMode: OutputMode): Unit = { - testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = true) + outputMode: OutputMode = OutputMode.Append()): Unit = { + testGlobalWatermarkLimit(testNamePostfix, plan, expectFailure = true, outputMode) } def testGlobalWatermarkLimit( testNamePostfix: String, plan: LogicalPlan, - outputMode: OutputMode, - expectFailure: Boolean): Unit = { + expectFailure: Boolean, Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org