HeartSaVioR commented on code in PR #41578:
URL: https://github.com/apache/spark/pull/41578#discussion_r1234534233


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -509,7 +509,24 @@ object UnsupportedOperationChecker extends Logging {
           throwError("Sampling is not supported on streaming 
DataFrames/Datasets")
 
         case Window(_, _, _, child) if child.isStreaming =>

Review Comment:
   Looks like it doesn't even need to... we can just extract the value out from 
the pattern.
   
   ```
   case Window(windowExpressions, _, _, child) if child.isStreaming =>
   ```
   
   and then remove L512 and use `windowExpressions` directly.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -509,7 +509,24 @@ object UnsupportedOperationChecker extends Logging {
           throwError("Sampling is not supported on streaming 
DataFrames/Datasets")
 
         case Window(_, _, _, child) if child.isStreaming =>

Review Comment:
   `case w @ Window(_, _, _, child) if child.isStreaming =>`
   
   w would be same as `subPlan.asInstanceOf[Window]`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -509,7 +509,24 @@ object UnsupportedOperationChecker extends Logging {
           throwError("Sampling is not supported on streaming 
DataFrames/Datasets")
 
         case Window(_, _, _, child) if child.isStreaming =>
-          throwError("Non-time-based windows are not supported on streaming 
DataFrames/Datasets")
+          val windowExpression = subPlan.asInstanceOf[Window].windowExpressions
+          val windowFuncs = windowExpression.flatMap { e =>
+            e.collect {
+              case we: WindowExpression =>
+                s"'${we.windowFunction}' as column '${e.toAttribute.sql}'"
+            }
+          }.mkString(", ")
+          val windowSpec = windowExpression.flatMap { e =>
+            e.collect {
+              case we: WindowExpression => we.windowSpec.sql
+            }
+          }.mkString(", ")
+          throw new AnalysisException(
+            s"Window function is not supported in $windowFuncs on streaming 
DataFrames/Datasets. " +
+            "Structured Streaming only supports time-window aggregation using 
the `window` " +
+            s"function. (window specification: '$windowSpec')",

Review Comment:
   @siying 
   The change itself seems to be OK.
   
   We are going to apply error class framework on new attempt of touching 
exception. Although we haven't made any progress on 
UnsupportedOperationChecker, we could consider this as a starter.
   
   Please look into the recent PR on error class migration in SS, especially 
`error-classes.json` and `QueryExecutionErrors.scala`.
   https://github.com/apache/spark/pull/41205
   
   Here is a guide doc for error class framework.
   
https://github.com/apache/spark/blob/master/core/src/main/resources/error/README.md
   
   Please go through and apply it. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to