This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 557f9f58817 [SPARK-43327][CORE][3.3] Trigger `committer.setupJob`
before plan execute in `FileFormatWriter#write`
557f9f58817 is described below
commit 557f9f58817b4d7ceee26069bd46bb779a61bef7
Author: zzzzming95 <[email protected]>
AuthorDate: Tue Aug 22 11:08:00 2023 +0800
[SPARK-43327][CORE][3.3] Trigger `committer.setupJob` before plan execute
in `FileFormatWriter#write`
### What changes were proposed in this pull request?
Trigger `committer.setupJob` before plan execute in `FileFormatWriter#write`
### Why are the changes needed?
In this issue, the case where `outputOrdering` might not work if AQE is
enabled has been resolved.
https://github.com/apache/spark/pull/38358
However, since it materializes the AQE plan in advance (triggers
getFinalPhysicalPlan) , it may cause the committer.setupJob(job) to not execute
When `AdaptiveSparkPlanExec#getFinalPhysicalPlan()` is executed with an error.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
add UT
Closes #41154 from zzzzming95/spark3-SPARK-43327.
Lead-authored-by: zzzzming95 <[email protected]>
Co-authored-by: zhiming she <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/datasources/FileFormatWriter.scala | 20 ++++++++---------
.../sql/test/DataFrameReaderWriterSuite.scala | 26 ++++++++++++++++++++--
2 files changed, 34 insertions(+), 12 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 84f3fd360b7..10268996990 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -185,6 +185,16 @@ object FileFormatWriter extends Logging {
statsTrackers = statsTrackers
)
+ SQLExecution.checkSQLExecutionId(sparkSession)
+
+ // propagate the description UUID into the jobs, so that committers
+ // get an ID guaranteed to be unique.
+ job.getConfiguration.set("spark.sql.sources.writeJobUUID",
description.uuid)
+
+ // This call shouldn't be put into the `try` block below because it only
initializes and
+ // prepares the job, any exception thrown from here shouldn't cause
abortJob() to be called.
+ committer.setupJob(job)
+
// We should first sort by partition columns, then bucket id, and finally
sorting columns.
val requiredOrdering =
partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++
sortColumns
@@ -208,16 +218,6 @@ object FileFormatWriter extends Logging {
}
}
- SQLExecution.checkSQLExecutionId(sparkSession)
-
- // propagate the description UUID into the jobs, so that committers
- // get an ID guaranteed to be unique.
- job.getConfiguration.set("spark.sql.sources.writeJobUUID",
description.uuid)
-
- // This call shouldn't be put into the `try` block below because it only
initializes and
- // prepares the job, any exception thrown from here shouldn't cause
abortJob() to be called.
- committer.setupJob(job)
-
try {
val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
(materializedPlan.execute(), None)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index c933ab50d21..addf6d2134c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.schema.PrimitiveType
@@ -32,7 +32,7 @@ import
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{SparkContext, TestUtils}
+import org.apache.spark.{SparkContext, SparkException, TestUtils}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
@@ -1275,4 +1275,26 @@ class DataFrameReaderWriterSuite extends QueryTest with
SharedSparkSession with
}
}
}
+
+ test("SPARK-43327: location exists when insertoverwrite fails") {
+ withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
+ withTable("t", "t1") {
+ sql("CREATE TABLE t(c1 int) USING parquet")
+ sql("CREATE TABLE t1(c2 long) USING parquet")
+ sql("INSERT OVERWRITE TABLE t1 SELECT 6000044164")
+
+ val identifier = TableIdentifier("t")
+ val location =
spark.sessionState.catalog.getTableMetadata(identifier).location
+
+ intercept[SparkException] {
+ sql("INSERT OVERWRITE TABLE t SELECT c2 FROM " +
+ "(SELECT cast(c2 as int) as c2 FROM t1 distribute by c2)")
+ }
+ // scalastyle:off hadoopconfiguration
+ val fs = FileSystem.get(location,
spark.sparkContext.hadoopConfiguration)
+ // scalastyle:on hadoopconfiguration
+ assert(fs.exists(new Path(location)))
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]