This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 885f4733c41 [SPARK-38753][SQL][TESTS] Move the tests for 
`WRITING_JOB_ABORTED` to `QueryExecutionErrorsSuite`
885f4733c41 is described below

commit 885f4733c413bdbb110946361247fbbd19f6bba9
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Thu Apr 14 15:36:07 2022 +0300

    [SPARK-38753][SQL][TESTS] Move the tests for `WRITING_JOB_ABORTED` to 
`QueryExecutionErrorsSuite`
    
    ### What changes were proposed in this pull request?
    Move test for the error class `WRITING_JOB_ABORTED` from 
`DataSourceV2Suite.scala` to `QueryExecutionErrorsSuite`.
    
    ### Why are the changes needed?
    To improve code maintenance - all tests for error classes are placed to 
Query.*ErrorsSuite. Also the exception is raised from 
[QueryExecutionErrors](https://github.com/apache/spark/blob/073fd2ad5c16d193725954e76ce357e4a9d97449/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala#L665-L670),
 so, tests should be in `QueryExecutionErrorsSuite` for consistency.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By running the affected test suites:
    ```
    $ build/sbt "sql/test:testOnly *QueryExecutionErrorsSuite"
    $ build/sbt "sql/test:testOnly *DataSourceV2Suite"
    ```
    
    Closes #36196 from MaxGekk/move-tests-WRITING_JOB_ABORTED.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../spark/sql/connector/DataSourceV2Suite.scala    | 24 ----------------
 .../sql/errors/QueryExecutionErrorsSuite.scala     | 33 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 24 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index 44d4f1fa825..3fefaf72df4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -22,7 +22,6 @@ import java.util.OptionalLong
 
 import test.org.apache.spark.sql.connector._
 
-import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, 
TableCapability, TableProvider}
@@ -349,29 +348,6 @@ class DataSourceV2Suite extends QueryTest with 
SharedSparkSession with AdaptiveS
             .option("path", path).mode("error").save()
         }
         assert(e2.getMessage.contains("please use Append or Overwrite modes 
instead"))
-
-        // test transaction
-        val failingUdf = org.apache.spark.sql.functions.udf {
-          var count = 0
-          (id: Long) => {
-            if (count > 5) {
-              throw new RuntimeException("testing error")
-            }
-            count += 1
-            id
-          }
-        }
-        // this input data will fail to read middle way.
-        val input = spark.range(15).select(failingUdf($"id").as(Symbol("i")))
-          .select($"i", -$"i" as Symbol("j"))
-        val e3 = intercept[SparkException] {
-          input.write.format(cls.getName).option("path", 
path).mode("overwrite").save()
-        }
-        assert(e3.getMessage.contains("Writing job aborted"))
-        assert(e3.getErrorClass == "WRITING_JOB_ABORTED")
-        assert(e3.getSqlState == "40000")
-        // make sure we don't have partial data.
-        assert(spark.read.format(cls.getName).option("path", 
path).load().collect().isEmpty)
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index e47e0823536..77eb6b28d54 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -19,9 +19,12 @@ package org.apache.spark.sql.errors
 
 import java.util.Locale
 
+import test.org.apache.spark.sql.connector.JavaSimpleWritableDataSource
+
 import org.apache.spark.{SparkArithmeticException, SparkException, 
SparkIllegalStateException, SparkRuntimeException, 
SparkUnsupportedOperationException, SparkUpgradeException}
 import org.apache.spark.sql.{DataFrame, QueryTest}
 import org.apache.spark.sql.catalyst.util.BadRecordException
+import org.apache.spark.sql.connector.SimpleWritableDataSource
 import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.sql.execution.datasources.orc.OrcTest
 import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
@@ -324,4 +327,34 @@ class QueryExecutionErrorsSuite extends QueryTest
     assert(e5.getSqlState === "42000")
     assert(e5.getMessage === "Cannot parse decimal")
   }
+
+  test("WRITING_JOB_ABORTED: read of input data fails in the middle") {
+    Seq(classOf[SimpleWritableDataSource], 
classOf[JavaSimpleWritableDataSource]).foreach { cls =>
+      withTempPath { file =>
+        val path = file.getCanonicalPath
+        assert(spark.read.format(cls.getName).option("path", 
path).load().collect().isEmpty)
+        // test transaction
+        val failingUdf = org.apache.spark.sql.functions.udf {
+          var count = 0
+          (id: Long) => {
+            if (count > 5) {
+              throw new RuntimeException("testing error")
+            }
+            count += 1
+            id
+          }
+        }
+        val input = spark.range(15).select(failingUdf($"id").as(Symbol("i")))
+          .select($"i", -$"i" as Symbol("j"))
+        val e = intercept[SparkException] {
+          input.write.format(cls.getName).option("path", 
path).mode("overwrite").save()
+        }
+        assert(e.getMessage === "Writing job aborted")
+        assert(e.getErrorClass === "WRITING_JOB_ABORTED")
+        assert(e.getSqlState === "40000")
+        // make sure we don't have partial data.
+        assert(spark.read.format(cls.getName).option("path", 
path).load().collect().isEmpty)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to