This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ee153ed6fa [GLUTEN-11088][VL] Fix GlutenParquetIOSuite compatibility
issues for Spark 4.0 (#11281)
ee153ed6fa is described below
commit ee153ed6fa877bbde6986c5eae8b991da8d7b73d
Author: Chang chen <[email protected]>
AuthorDate: Wed Dec 17 14:55:33 2025 +0800
[GLUTEN-11088][VL] Fix GlutenParquetIOSuite compatibility issues for Spark
4.0 (#11281)
* Replace direct exception throwing with
`GlutenFileFormatWriter.throwWriteError` for task failure handling.
* Respect 'mapreduce.output.basename' configuration for file name
generation, according to https://github.com/apache/spark/pull/48494
* Refactor imports and variable initializations for improved clarity and
consistency
* Remove exclusions
* Assert on the cause message
* Enhance error handling in commit and abort tasks to provide better
diagnostics
* Fix minor syntax inconsistency
---------
Co-authored-by: Chang chen <[email protected]>
---
.../execution/SparkWriteFilesCommitProtocol.scala | 24 +++++++++++++++-------
.../gluten/utils/velox/VeloxTestSettings.scala | 3 ---
.../org/apache/gluten/sql/shims/SparkShims.scala | 9 ++++++--
.../gluten/sql/shims/spark40/Spark40Shims.scala | 7 +++++--
.../sql/execution/GlutenFileFormatWriter.scala | 6 ++++++
5 files changed, 35 insertions(+), 14 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
index 13a9b987f3..b29f6a10f1 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution
+import org.apache.gluten.sql.shims.SparkShimLoader
+
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec,
HadoopMapReduceCommitProtocol}
@@ -44,15 +46,15 @@ class SparkWriteFilesCommitProtocol(
extends Logging {
assert(committer.isInstanceOf[HadoopMapReduceCommitProtocol])
- val sparkStageId = TaskContext.get().stageId()
- val sparkPartitionId = TaskContext.get().partitionId()
- val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt &
Int.MaxValue
+ val sparkStageId: Int = TaskContext.get().stageId()
+ val sparkPartitionId: Int = TaskContext.get().partitionId()
+ val sparkAttemptNumber: Int = TaskContext.get().taskAttemptId().toInt &
Int.MaxValue
private val jobId = createJobID(jobTrackerID, sparkStageId)
private val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
private val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
- private var fileNames: mutable.Set[String] = null
+ private var fileNames: mutable.Set[String] = _
// Set up the attempt context required to use in the output committer.
val taskAttemptContext: TaskAttemptContext = {
@@ -86,7 +88,9 @@ class SparkWriteFilesCommitProtocol(
// Note that %05d does not truncate the split number, so if we have more
than 100000 tasks,
// the file name is fine and won't overflow.
val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId
- val fileName =
f"${spec.prefix}part-$split%05d-${UUID.randomUUID().toString()}${spec.suffix}"
+ val basename =
taskAttemptContext.getConfiguration.get("mapreduce.output.basename", "part")
+ val fileName =
f"${spec.prefix}$basename-$split%05d-${UUID.randomUUID().toString}${spec.suffix}"
+
fileNames += fileName
fileName
}
@@ -103,7 +107,13 @@ class SparkWriteFilesCommitProtocol(
stagingDir.toString
}
- def commitTask(): Unit = {
+ private def enrichWriteError[T](path: => String)(f: => T): T = try {
+ f
+ } catch {
+ case t: Throwable => SparkShimLoader.getSparkShims.enrichWriteException(t,
description.path)
+ }
+
+ def commitTask(): Unit = enrichWriteError(description.path) {
val (_, taskCommitTime) = Utils.timeTakenMs {
committer.commitTask(taskAttemptContext)
}
@@ -114,7 +124,7 @@ class SparkWriteFilesCommitProtocol(
}
}
- def abortTask(writePath: String): Unit = {
+ def abortTask(writePath: String): Unit = enrichWriteError(description.path) {
committer.abortTask(taskAttemptContext)
// Deletes the files written by current task.
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index a63ca37d3f..bad6ccb015 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -475,9 +475,6 @@ class VeloxTestSettings extends BackendTestSettings {
// Velox parquet reader not allow offset zero.
.exclude("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings")
// TODO: fix in Spark-4.0
- .exclude("SPARK-49991: Respect 'mapreduce.output.basename' to generate
file names")
- .exclude("SPARK-6330 regression test")
- .exclude("SPARK-7837 Do not close output writer twice when commitTask()
fails")
.exclude("explode nested lists crossing a rowgroup boundary")
enableSuite[GlutenParquetV1PartitionDiscoverySuite]
enableSuite[GlutenParquetV2PartitionDiscoverySuite]
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 33f59ff20e..de220cab82 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -97,7 +97,7 @@ trait SparkShims {
def generateFileScanRDD(
sparkSession: SparkSession,
- readFunction: (PartitionedFile) => Iterator[InternalRow],
+ readFunction: PartitionedFile => Iterator[InternalRow],
filePartitions: Seq[FilePartition],
fileSourceScanExec: FileSourceScanExec): FileScanRDD
@@ -145,7 +145,7 @@ trait SparkShims {
Expression,
Expression,
Int,
- Int) => TypedImperativeAggregate[T]): Expression;
+ Int) => TypedImperativeAggregate[T]): Expression
def replaceMightContain[T](
expr: Expression,
@@ -343,6 +343,11 @@ trait SparkShims {
t)
}
+ // Compatibility method for Spark 4.0: rethrows the exception cause to
maintain API compatibility
+ def enrichWriteException(cause: Throwable, path: String): Nothing = {
+ throw cause
+ }
+
def getFileSourceScanStream(scan: FileSourceScanExec):
Option[SparkDataStream] = {
None
}
diff --git
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index 88fe373175..80c22f2fad 100644
---
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -65,8 +65,8 @@ import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
import java.util.{Map => JMap}
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
class Spark40Shims extends SparkShims {
@@ -151,7 +151,7 @@ class Spark40Shims extends SparkShims {
options: CaseInsensitiveStringMap,
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): TextScan = {
- new TextScan(
+ TextScan(
sparkSession,
fileIndex,
dataSchema,
@@ -742,6 +742,9 @@ class Spark40Shims extends SparkShims {
throw t
}
+ override def enrichWriteException(cause: Throwable, path: String): Nothing =
{
+ GlutenFileFormatWriter.wrapWriteError(cause, path)
+ }
override def getFileSourceScanStream(scan: FileSourceScanExec):
Option[SparkDataStream] = {
scan.stream
}
diff --git
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/GlutenFileFormatWriter.scala
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/GlutenFileFormatWriter.scala
index bd51939f9d..8dc5fbc964 100644
---
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/GlutenFileFormatWriter.scala
+++
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/GlutenFileFormatWriter.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.{FileFormatWriter,
WriteJobDescription, WriteTaskResult}
object GlutenFileFormatWriter {
@@ -40,4 +41,9 @@ object GlutenFileFormatWriter {
None
)
}
+
+ // Wrapper for throwing standardized write error using QueryExecutionErrors
+ def wrapWriteError(cause: Throwable, writePath: String): Nothing = {
+ throw QueryExecutionErrors.taskFailedWhileWritingRowsError(writePath,
cause)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]