MaxGekk commented on code in PR #41578: URL: https://github.com/apache/spark/pull/41578#discussion_r1245492304
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ########## @@ -508,8 +509,22 @@ object UnsupportedOperationChecker extends Logging { case Sample(_, _, _, _, child) if child.isStreaming => throwError("Sampling is not supported on streaming DataFrames/Datasets") - case Window(_, _, _, child) if child.isStreaming => - throwError("Non-time-based windows are not supported on streaming DataFrames/Datasets") + case Window(windowExpression, _, _, child) if child.isStreaming => + val windowFuncs = windowExpression.flatMap { e => + e.collect { + case we: WindowExpression => + s"'${we.windowFunction}' as column '${e.toAttribute.sql}'" Review Comment: Could you change quoting of the expression and the id using `toSQLExpr` and `toSQLId` ########## core/src/main/resources/error/error-classes.json: ########## @@ -1695,6 +1695,11 @@ ], "sqlState" : "42000" }, + "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING" : { + "message" : [ + "Window function is not supported in <windowFunc> on streaming DataFrames/Datasets. Structured Streaming only supports time-window aggregation using the `window` function. (window specification: <windowSpec>)" Review Comment: ```suggestion "Window function is not supported in <windowFunc> on streaming DataFrames/Datasets. Structured Streaming only supports time-window aggregation using the WINDOW function. (window specification: <windowSpec>)" ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala: ########## @@ -724,6 +724,18 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { messageParameters = Map("className" -> className, "operator" -> operator)) } + def nonTimeWindowNotSupportedInStreamingError( + windowFunc: String, + windowSpec: String, + origin: Origin): AnalysisException = { + new AnalysisException( + errorClass = "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING", + messageParameters = Map( + "windowFunc" -> windowFunc, + "windowSpec" -> toSQLValue(windowSpec, StringType)), Review Comment: ```suggestion "windowSpec" -> toSQLStmt(windowSpec)), ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ########## @@ -508,8 +509,22 @@ object UnsupportedOperationChecker extends Logging { case Sample(_, _, _, _, child) if child.isStreaming => throwError("Sampling is not supported on streaming DataFrames/Datasets") - case Window(_, _, _, child) if child.isStreaming => - throwError("Non-time-based windows are not supported on streaming DataFrames/Datasets") + case Window(windowExpression, _, _, child) if child.isStreaming => + val windowFuncs = windowExpression.flatMap { e => + e.collect { + case we: WindowExpression => + s"'${we.windowFunction}' as column '${e.toAttribute.sql}'" Review Comment: and just pass both to `nonTimeWindowNotSupportedInStreamingError`, and put `as column` to error-classes.json. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org