HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1013649051
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ########## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { + case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) + case _ => false + } + + def hasEventTimeColNeq(neq: Expression): Boolean = { + val exp = neq.asInstanceOf[BinaryComparison] + hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + def hasEventTimeCol(exps: Expression): Boolean = + exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false + } + + // TODO: This function and hasRangeExpr + // should be deleted after we support range join with states + def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = { + plan match { + case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => + left.isStreaming && right.isStreaming + otherCondition.isDefined && hasRangeExpr(otherCondition.get) + case _ => false + } + } + /** * Checks for possible correctness issue in chained stateful operators. The behavior is * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just * print a warning message. */ def checkStreamingQueryGlobalWatermarkLimit( - plan: LogicalPlan, - outputMode: OutputMode): Unit = { + plan: LogicalPlan): Unit = { def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { - case s: Aggregate - if s.isStreaming && outputMode == InternalOutputModes.Append => true case Join(left, right, joinType, _, _) Review Comment: We can remove this line as we support outer join as well. We only have issue with stream-stream time interval join (with all types) and flatMapGroupsWithState. (Arguably flatMapGroupsWithState with all output modes should be disallowed, but I believe we have a separate check for output mode so OK.) ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ########## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { + case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) + case _ => false + } + + def hasEventTimeColNeq(neq: Expression): Boolean = { + val exp = neq.asInstanceOf[BinaryComparison] + hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + def hasEventTimeCol(exps: Expression): Boolean = + exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false + } + + // TODO: This function and hasRangeExpr + // should be deleted after we support range join with states + def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = { + plan match { + case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => + left.isStreaming && right.isStreaming + otherCondition.isDefined && hasRangeExpr(otherCondition.get) + case _ => false + } + } + /** * Checks for possible correctness issue in chained stateful operators. The behavior is * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just * print a warning message. */ def checkStreamingQueryGlobalWatermarkLimit( - plan: LogicalPlan, - outputMode: OutputMode): Unit = { + plan: LogicalPlan): Unit = { def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { - case s: Aggregate - if s.isStreaming && outputMode == InternalOutputModes.Append => true case Join(left, right, joinType, _, _) Review Comment: We can remove this line as we support outer join as well. We only have issue with stream-stream time interval join (with all types) and flatMapGroupsWithState. (Arguably flatMapGroupsWithState with all output modes should be disallowed, but I believe we will have a separate check for output mode so OK.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org