This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 557f9f58817 [SPARK-43327][CORE][3.3] Trigger `committer.setupJob` 
before plan execute in `FileFormatWriter#write`
557f9f58817 is described below

commit 557f9f58817b4d7ceee26069bd46bb779a61bef7
Author: zzzzming95 <[email protected]>
AuthorDate: Tue Aug 22 11:08:00 2023 +0800

    [SPARK-43327][CORE][3.3] Trigger `committer.setupJob` before plan execute 
in `FileFormatWriter#write`
    
    ### What changes were proposed in this pull request?
    
    Trigger `committer.setupJob` before plan execute in `FileFormatWriter#write`
    
    ### Why are the changes needed?
    
    In this issue, the case where `outputOrdering` might not work if AQE is 
enabled has been resolved.
    
    https://github.com/apache/spark/pull/38358
    
    However, since it materializes the AQE plan in advance (triggers 
getFinalPhysicalPlan) , it may cause the committer.setupJob(job) to not execute 
When `AdaptiveSparkPlanExec#getFinalPhysicalPlan()` is executed with an error.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    add UT
    
    Closes #41154 from zzzzming95/spark3-SPARK-43327.
    
    Lead-authored-by: zzzzming95 <[email protected]>
    Co-authored-by: zhiming she <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../execution/datasources/FileFormatWriter.scala   | 20 ++++++++---------
 .../sql/test/DataFrameReaderWriterSuite.scala      | 26 ++++++++++++++++++++--
 2 files changed, 34 insertions(+), 12 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 84f3fd360b7..10268996990 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -185,6 +185,16 @@ object FileFormatWriter extends Logging {
       statsTrackers = statsTrackers
     )
 
+    SQLExecution.checkSQLExecutionId(sparkSession)
+
+    // propagate the description UUID into the jobs, so that committers
+    // get an ID guaranteed to be unique.
+    job.getConfiguration.set("spark.sql.sources.writeJobUUID", 
description.uuid)
+
+    // This call shouldn't be put into the `try` block below because it only 
initializes and
+    // prepares the job, any exception thrown from here shouldn't cause 
abortJob() to be called.
+    committer.setupJob(job)
+
     // We should first sort by partition columns, then bucket id, and finally 
sorting columns.
     val requiredOrdering =
       partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ 
sortColumns
@@ -208,16 +218,6 @@ object FileFormatWriter extends Logging {
       }
     }
 
-    SQLExecution.checkSQLExecutionId(sparkSession)
-
-    // propagate the description UUID into the jobs, so that committers
-    // get an ID guaranteed to be unique.
-    job.getConfiguration.set("spark.sql.sources.writeJobUUID", 
description.uuid)
-
-    // This call shouldn't be put into the `try` block below because it only 
initializes and
-    // prepares the job, any exception thrown from here shouldn't cause 
abortJob() to be called.
-    committer.setupJob(job)
-
     try {
       val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
         (materializedPlan.execute(), None)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index c933ab50d21..addf6d2134c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.hadoop.util.HadoopInputFile
 import org.apache.parquet.schema.PrimitiveType
@@ -32,7 +32,7 @@ import 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 import org.apache.parquet.schema.Type.Repetition
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{SparkContext, TestUtils}
+import org.apache.spark.{SparkContext, SparkException, TestUtils}
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
@@ -1275,4 +1275,26 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSparkSession with
       }
     }
   }
+
+  test("SPARK-43327: location exists when insertoverwrite fails") {
+    withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
+      withTable("t", "t1") {
+        sql("CREATE TABLE t(c1 int) USING parquet")
+        sql("CREATE TABLE t1(c2 long) USING parquet")
+        sql("INSERT OVERWRITE TABLE t1 SELECT 6000044164")
+
+        val identifier = TableIdentifier("t")
+        val location = 
spark.sessionState.catalog.getTableMetadata(identifier).location
+
+        intercept[SparkException] {
+          sql("INSERT OVERWRITE TABLE t SELECT c2 FROM " +
+            "(SELECT cast(c2 as int) as c2 FROM t1 distribute by c2)")
+        }
+        // scalastyle:off hadoopconfiguration
+        val fs = FileSystem.get(location, 
spark.sparkContext.hadoopConfiguration)
+        // scalastyle:on hadoopconfiguration
+        assert(fs.exists(new Path(location)))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to