MaxGekk commented on code in PR #41578: URL: https://github.com/apache/spark/pull/41578#discussion_r1247460708
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ########## @@ -19,11 +19,14 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow, WindowExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLId +import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLStmt Review Comment: Could you pass sequences of (windowFunc, columnName, windowSpec), and do quoting inside of `QueryExecutionErrors`, please. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ########## @@ -508,8 +511,27 @@ object UnsupportedOperationChecker extends Logging { case Sample(_, _, _, _, child) if child.isStreaming => throwError("Sampling is not supported on streaming DataFrames/Datasets") - case Window(_, _, _, child) if child.isStreaming => - throwError("Non-time-based windows are not supported on streaming DataFrames/Datasets") + case Window(windowExpression, _, _, child) if child.isStreaming => + val windowFunc = windowExpression.flatMap { e => + e.collect { + case we: WindowExpression => toSQLStmt(we.windowFunction.toString) + } + }.mkString(", ") + val columnName = windowExpression.flatMap { e => + e.collect { + case we: WindowExpression => toSQLId(e.toAttribute.sql) + } + }.mkString(", ") + val windowSpec = windowExpression.flatMap { e => + e.collect { + case we: WindowExpression => toSQLStmt(we.windowSpec.sql) + } + }.mkString(", ") Review Comment: Let's merge all of them and return (we.windowFunction, e.toAttribute, we.windowSpec) ########## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala: ########## @@ -724,6 +724,20 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { messageParameters = Map("className" -> className, "operator" -> operator)) } + def nonTimeWindowNotSupportedInStreamingError( + windowFunc: String, + columnName: String, + windowSpec: String, + origin: Origin): AnalysisException = { + new AnalysisException( + errorClass = "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING", + messageParameters = Map( + "windowFunc" -> windowFunc, + "columnName" -> columnName, + "windowSpec" -> windowSpec), Review Comment: Better to perform quoting here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org