This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f406b54b2a8 [SPARK-44044][SS] Improve Error message for Window 
functions with streaming
f406b54b2a8 is described below

commit f406b54b2a899d03bae2e6f70eef7fedfed63d65
Author: Siying Dong <siying.d...@databricks.com>
AuthorDate: Sat Jul 1 08:51:22 2023 +0300

    [SPARK-44044][SS] Improve Error message for Window functions with streaming
    
    ### What changes were proposed in this pull request?
    Replace existing error message when non-time window function is used with 
streaming to include aggregation function and column. The error message looks 
like following now:
    
    org.apache.spark.sql.AnalysisException: Window function is not supported in 
'row_number()' as column 'rn_col' on streaming DataFrames/Datasets. Structured 
Streaming only supports time-window aggregation using the `window` unction. 
(window specification: '(PARTITION BY col1 ORDER BY col2 ASC NULLS FIRST ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)')
    
    Note that the message is a little bit unnatural as the existing unit test 
requires the exception follows the pattern that it includes "not supported", 
"streaming" "DataFrames" and "Dataset".
    
    ### Why are the changes needed?
    The exiting error message is vague and a full logical plan is included. A 
user reports that they aren't able to identify what the problem is.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added a unit test
    
    Closes #41578 from siying/window_error.
    
    Lead-authored-by: Siying Dong <siying.d...@databricks.com>
    Co-authored-by: Siying Dong <dong...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    |  5 ++++
 .../analysis/UnsupportedOperationChecker.scala     | 17 ++++++++++---
 .../spark/sql/errors/QueryExecutionErrors.scala    | 16 ++++++++++++-
 .../analysis/UnsupportedOperationsSuite.scala      | 24 ++++++++++++++-----
 .../apache/spark/sql/streaming/StreamSuite.scala   | 28 ++++++++++++++++++++++
 5 files changed, 80 insertions(+), 10 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index eabd5533e13..14bd3bc6bac 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1775,6 +1775,11 @@
     ],
     "sqlState" : "42000"
   },
+  "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING" : {
+    "message" : [
+      "Window function is not supported in <windowFunc> (as column 
<columnName>) on streaming DataFrames/Datasets. Structured Streaming only 
supports time-window aggregation using the WINDOW function. (window 
specification: <windowSpec>)"
+    ]
+  },
   "NOT_ALLOWED_IN_FROM" : {
     "message" : [
       "Not allowed in the FROM clause:"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index daa7c0d54b7..2a09d85d8f2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, 
Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, 
LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, 
Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, 
LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow, 
WindowExpression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
 
@@ -508,8 +509,18 @@ object UnsupportedOperationChecker extends Logging {
         case Sample(_, _, _, _, child) if child.isStreaming =>
           throwError("Sampling is not supported on streaming 
DataFrames/Datasets")
 
-        case Window(_, _, _, child) if child.isStreaming =>
-          throwError("Non-time-based windows are not supported on streaming 
DataFrames/Datasets")
+        case Window(windowExpression, _, _, child) if child.isStreaming =>
+          val (windowFuncList, columnNameList, windowSpecList) = 
windowExpression.flatMap { e =>
+            e.collect {
+              case we: WindowExpression =>
+                (we.windowFunction.toString, e.toAttribute.sql, 
we.windowSpec.sql)
+              }
+          }.unzip3
+          throw QueryExecutionErrors.nonTimeWindowNotSupportedInStreamingError(
+            windowFuncList,
+            columnNameList,
+            windowSpecList,
+            subPlan.origin)
 
         case ReturnAnswer(child) if child.isStreaming =>
           throwError("Cannot return immediate result on streaming 
DataFrames/Dataset. Queries " +
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 59b66bd4343..74c29cabbe1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ValueInterval
-import org.apache.spark.sql.catalyst.trees.{SQLQueryContext, TreeNode}
+import org.apache.spark.sql.catalyst.trees.{Origin, SQLQueryContext, TreeNode}
 import org.apache.spark.sql.catalyst.util.{sideBySide, BadRecordException, 
DateTimeUtils, FailFastMode}
 import org.apache.spark.sql.connector.catalog.{CatalogNotFoundException, 
Table, TableProvider}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@@ -724,6 +724,20 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
       messageParameters = Map("className" -> className, "operator" -> 
operator))
   }
 
+  def nonTimeWindowNotSupportedInStreamingError(
+      windowFuncList: Seq[String],
+      columnNameList: Seq[String],
+      windowSpecList: Seq[String],
+      origin: Origin): AnalysisException = {
+    new AnalysisException(
+      errorClass = "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING",
+      messageParameters = Map(
+        "windowFunc" -> windowFuncList.map(toSQLStmt(_)).mkString(","),
+        "columnName" -> columnNameList.map(toSQLId(_)).mkString(","),
+        "windowSpec" -> windowSpecList.map(toSQLStmt(_)).mkString(",")),
+        origin = origin)
+  }
+
   def multiplePathsSpecifiedError(allPaths: Seq[String]): 
SparkIllegalArgumentException = {
     new SparkIllegalArgumentException(
       errorClass = "_LEGACY_ERROR_TEMP_2050",
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index f9fd02b86e9..32c9a3aa17e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -738,7 +738,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite with 
SQLHelper {
   testUnaryOperatorInStreamingPlan(
     "sample", Sample(0.1, 1, true, 1L, _), expectedMsg = "sampling")
   testUnaryOperatorInStreamingPlan(
-    "window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")
+    "window",
+    Window(Nil, Nil, Nil, _),
+    errorClass = "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING")
 
   // Output modes with aggregation and non-aggregation plans
   testOutputMode(Append, shouldSupportAggregation = false, 
shouldSupportNonAggregation = true)
@@ -870,7 +872,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite with 
SQLHelper {
     operationName: String,
     logicalPlanGenerator: LogicalPlan => LogicalPlan,
     outputMode: OutputMode = Append,
-    expectedMsg: String = ""): Unit = {
+    expectedMsg: String = "",
+    errorClass: String = ""): Unit = {
 
     val expectedMsgs = if (expectedMsg.isEmpty) Seq(operationName) else 
Seq(expectedMsg)
 
@@ -878,7 +881,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite with 
SQLHelper {
       s"$operationName with stream relation",
       wrapInStreaming(logicalPlanGenerator(streamRelation)),
       outputMode,
-      expectedMsgs)
+      expectedMsgs,
+      errorClass)
 
     assertSupportedInStreamingPlan(
       s"$operationName with batch relation",
@@ -1025,10 +1029,12 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
       name: String,
       plan: LogicalPlan,
       outputMode: OutputMode,
-      expectedMsgs: Seq[String]): Unit = {
+      expectedMsgs: Seq[String],
+      errorClass: String = ""): Unit = {
     testError(
       s"streaming plan - $name: not supported",
-      expectedMsgs :+ "streaming" :+ "DataFrame" :+ "Dataset" :+ "not 
supported") {
+      expectedMsgs :+ "streaming" :+ "DataFrame" :+ "Dataset" :+ "not 
supported",
+      errorClass) {
       UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), 
outputMode)
     }
   }
@@ -1090,7 +1096,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
    * Test whether the body of code will fail. If it does fail, then check if 
it has expected
    * messages.
    */
-  def testError(testName: String, expectedMsgs: Seq[String])(testBody: => 
Unit): Unit = {
+  def testError(
+      testName: String,
+      expectedMsgs: Seq[String],
+      errorClass: String = "")(testBody: => Unit): Unit = {
 
     test(testName) {
       val e = intercept[AnalysisException] {
@@ -1102,6 +1111,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
             s"actual exception message:\n\t'${e.getMessage}'")
         }
       }
+      if (!errorClass.isEmpty) {
+        assert(e.getErrorClass == errorClass)
+      }
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 6fd63454e82..0ee44a098f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.streaming._
 import 
org.apache.spark.sql.execution.streaming.sources.{ContinuousMemoryStream, 
MemorySink}
 import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreConf, StateStoreId, StateStoreProvider}
+import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.StreamSourceProvider
@@ -686,6 +687,33 @@ class StreamSuite extends StreamTest {
     assert(query.exception.isEmpty)
   }
 
+  test("SPARK-44044: non-time-window") {
+    val inputData = MemoryStream[(Int, Int)]
+    val e = intercept[AnalysisException] {
+      val agg = inputData
+        .toDF()
+        .selectExpr("CAST(_1 AS timestamp) AS col1", "_2 AS col2")
+        .withWatermark("col1", "10 seconds")
+        .withColumn("rn_col", row_number().over(Window
+          .partitionBy("col1")
+          .orderBy(col("col2"))))
+        .select("rn_col", "col1", "col2")
+        .writeStream
+        .format("console")
+        .start()
+    }
+    checkError(
+      e,
+      "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING",
+      parameters = Map(
+        "windowFunc" -> "ROW_NUMBER()",
+        "columnName" -> "`rn_col`",
+        "windowSpec" ->
+          ("(PARTITION BY COL1 ORDER BY COL2 ASC NULLS FIRST ROWS BETWEEN 
UNBOUNDED PRECEDING " +
+          "AND CURRENT ROW)")))
+  }
+
+
   test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
     val inputData = MemoryStream[(Int, Int)]
     val agg = inputData.toDS().groupBy("_1").count()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to