jaceklaskowski commented on code in PR #41578:
URL: https://github.com/apache/spark/pull/41578#discussion_r1229274898


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -509,7 +509,18 @@ object UnsupportedOperationChecker extends Logging {
           throwError("Sampling is not supported on streaming 
DataFrames/Datasets")
 
         case Window(_, _, _, child) if child.isStreaming =>
-          throwError("Non-time-based windows are not supported on streaming 
DataFrames/Datasets")
+          val windowFuncs = subPlan.asInstanceOf[Window].projectList.flatMap { 
e =>
+            e.collect {
+              case we: WindowExpression => s"${we.windowFunction} AS 
${e.toAttribute.sql}"
+            }
+          }.mkString(", ")
+          throw new AnalysisException(
+            s"Unsupported window function found in column '$windowFuncs'. 
Structured " +

Review Comment:
   nit: Remove `found`



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala:
##########
@@ -684,6 +685,26 @@ class StreamSuite extends StreamTest {
     assert(query.exception.isEmpty)
   }
 
+  test("SPARK-44044: non-time-window") {
+    val inputData = MemoryStream[(Int, Int)]
+    val e = intercept[AnalysisException] {
+      val agg = inputData
+        .toDF()
+        .selectExpr("CAST(_1 AS timestamp) AS col1", "_2 AS col2")
+        .withWatermark("col1", "10 seconds")
+        .withColumn("rn_col", row_number().over(Window
+          .partitionBy("col1")
+          .orderBy(col("col2"))))
+        .select("rn_col", "col1", "col2")
+        .writeStream
+        .format("console")
+        .start()
+    }
+  assert(e.getMessage.contains(
+    "Unsupported window function found in column 'row_number() AS rn_col'"))

Review Comment:
   Interesting...should the error message include `OVER` spec? 🤔 It might not 
necessarily be related to this change but an issue with window aggregation in 
Spark SQL in general.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -509,7 +509,18 @@ object UnsupportedOperationChecker extends Logging {
           throwError("Sampling is not supported on streaming 
DataFrames/Datasets")
 
         case Window(_, _, _, child) if child.isStreaming =>
-          throwError("Non-time-based windows are not supported on streaming 
DataFrames/Datasets")
+          val windowFuncs = subPlan.asInstanceOf[Window].projectList.flatMap { 
e =>
+            e.collect {
+              case we: WindowExpression => s"${we.windowFunction} AS 
${e.toAttribute.sql}"
+            }
+          }.mkString(", ")
+          throw new AnalysisException(
+            s"Unsupported window function found in column '$windowFuncs'. 
Structured " +
+            "streaming only supports time-window aggregation using the 
`window` function.",

Review Comment:
   nit: Streaming (upper case)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to