This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 442d38478b [GLUTEN-9801] Delete the written file if the task failed
(#9808)
442d38478b is described below
commit 442d38478ba1edb2d5ce0c06df6702e32a706111
Author: JiaKe <[email protected]>
AuthorDate: Fri May 30 20:21:03 2025 +0800
[GLUTEN-9801] Delete the written file if the task failed (#9808)
Adopt Vanilla spark's implementation here into gluten to clean up the
written file if the task failed .
---
.../apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala | 5 ++++-
.../org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala | 2 +-
2 files changed, 5 insertions(+), 2 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
index 845f2f98fb..197b57f592 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
@@ -97,8 +97,11 @@ class SparkWriteFilesCommitProtocol(
}
}
- def abortTask(): Unit = {
+ def abortTask(writePath: String): Unit = {
committer.abortTask(taskAttemptContext)
+
+ val tmpPath = new Path(writePath)
+ tmpPath.getFileSystem(taskAttemptContext.getConfiguration).delete(tmpPath,
true)
}
// Copied from `SparkHadoopWriterUtils.createJobID` to be compatible with
multi-version
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index 88937b7d32..8d8e4b9276 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -220,7 +220,7 @@ class VeloxColumnarWriteFilesRDD(
})(
catchBlock = {
// If there is an error, abort the task
- commitProtocol.abortTask()
+ commitProtocol.abortTask(writePath)
logError(s"Job ${commitProtocol.getJobId} aborted.")
}
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]