jaceklaskowski commented on code in PR #41578: URL: https://github.com/apache/spark/pull/41578#discussion_r1229274898
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ########## @@ -509,7 +509,18 @@ object UnsupportedOperationChecker extends Logging { throwError("Sampling is not supported on streaming DataFrames/Datasets") case Window(_, _, _, child) if child.isStreaming => - throwError("Non-time-based windows are not supported on streaming DataFrames/Datasets") + val windowFuncs = subPlan.asInstanceOf[Window].projectList.flatMap { e => + e.collect { + case we: WindowExpression => s"${we.windowFunction} AS ${e.toAttribute.sql}" + } + }.mkString(", ") + throw new AnalysisException( + s"Unsupported window function found in column '$windowFuncs'. Structured " + Review Comment: nit: Remove `found` ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala: ########## @@ -684,6 +685,26 @@ class StreamSuite extends StreamTest { assert(query.exception.isEmpty) } + test("SPARK-44044: non-time-window") { + val inputData = MemoryStream[(Int, Int)] + val e = intercept[AnalysisException] { + val agg = inputData + .toDF() + .selectExpr("CAST(_1 AS timestamp) AS col1", "_2 AS col2") + .withWatermark("col1", "10 seconds") + .withColumn("rn_col", row_number().over(Window + .partitionBy("col1") + .orderBy(col("col2")))) + .select("rn_col", "col1", "col2") + .writeStream + .format("console") + .start() + } + assert(e.getMessage.contains( + "Unsupported window function found in column 'row_number() AS rn_col'")) Review Comment: Interesting...should the error message include `OVER` spec? 🤔 It might not necessarily be related to this change but an issue with window aggregation in Spark SQL in general. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ########## @@ -509,7 +509,18 @@ object UnsupportedOperationChecker extends Logging { throwError("Sampling is not supported on streaming DataFrames/Datasets") case Window(_, _, _, child) if child.isStreaming => - throwError("Non-time-based windows are not supported on streaming DataFrames/Datasets") + val windowFuncs = subPlan.asInstanceOf[Window].projectList.flatMap { e => + e.collect { + case we: WindowExpression => s"${we.windowFunction} AS ${e.toAttribute.sql}" + } + }.mkString(", ") + throw new AnalysisException( + s"Unsupported window function found in column '$windowFuncs'. Structured " + + "streaming only supports time-window aggregation using the `window` function.", Review Comment: nit: Streaming (upper case) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org