This is an automated email from the ASF dual-hosted git repository. richox pushed a commit to branch dev-v6.0.0-decimal-cast in repository https://gitbox.apache.org/repos/asf/auron.git
commit 0eaa812e107198689b79d8bd66cd30d5e0548b99 Author: Zhang Li <[email protected]> AuthorDate: Thu Aug 7 17:02:38 2025 +0800 fix execution error in non-native parquet sink tasks (#1123) Co-authored-by: zhangli20 <[email protected]> --- .../NativeParquetInsertIntoHiveTableExec.scala | 33 ++++++++++++++++++++-- .../NativeParquetInsertIntoHiveTableBase.scala | 1 + .../blaze/plan/NativeParquetSinkBase.scala | 3 ++ 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala index ef31c270..fccb22d9 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala @@ -113,9 +113,17 @@ case class NativeParquetInsertIntoHiveTableExec( new BasicWriteJobStatsTracker(serializableHadoopConf, metrics) { override def newTaskInstance(): WriteTaskStatsTracker = { new BasicWriteTaskStatsTracker(serializableHadoopConf.value) { - override def newRow(_filePath: String, _row: InternalRow): Unit = {} + override def newRow(filePath: String, row: InternalRow): Unit = { + if (!ParquetSinkTaskContext.get.isNative) { + return super.newRow(filePath, row) + } + } override def closeFile(filePath: String): Unit = { + if (!ParquetSinkTaskContext.get.isNative) { + return super.closeFile(filePath) + } + val outputFileStat = ParquetSinkTaskContext.get.processedOutputFiles.remove() for (_ <- 0L until outputFileStat.numRows) { super.newRow(filePath, null) @@ -147,12 +155,23 @@ case class NativeParquetInsertIntoHiveTableExec( mutable.ArrayBuffer.empty override def newPartition(partitionValues: InternalRow): Unit = { + if (!ParquetSinkTaskContext.get.isNative) { + return super.newPartition(partitionValues) + } partitions.append(partitionValues) } - override def newRow(_row: InternalRow): Unit = {} + override def newRow(row: InternalRow): Unit = { + if (!ParquetSinkTaskContext.get.isNative) { + return super.newRow(row) + } + } override def getFinalStats(): WriteTaskStats = { + if (!ParquetSinkTaskContext.get.isNative) { + return super.getFinalStats() + } + val outputFileStat = ParquetSinkTaskContext.get.processedOutputFiles.remove() BasicWriteTaskStats( partitions = partitions, @@ -179,9 +198,17 @@ case class NativeParquetInsertIntoHiveTableExec( new BasicWriteJobStatsTracker(serializableHadoopConf, metrics) { override def newTaskInstance(): WriteTaskStatsTracker = { new BasicWriteTaskStatsTracker(serializableHadoopConf.value) { - override def newRow(_row: InternalRow): Unit = {} + override def newRow(row: InternalRow): Unit = { + if (!ParquetSinkTaskContext.get.isNative) { + return super.newRow(row) + } + } override def getFinalStats(): WriteTaskStats = { + if (!ParquetSinkTaskContext.get.isNative) { + return super.getFinalStats() + } + val outputFileStat = ParquetSinkTaskContext.get.processedOutputFiles.remove() BasicWriteTaskStats( numPartitions = 1, diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableBase.scala index 8c4ee53d..cc8cd9bb 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableBase.scala @@ -165,6 +165,7 @@ class BlazeMapredParquetOutputFormat case class OutputFileStat(path: String, numRows: Long, numBytes: Long) class ParquetSinkTaskContext { + var isNative: Boolean = false val processingOutputFiles = new LinkedBlockingDeque[String]() val processedOutputFiles = new util.ArrayDeque[OutputFileStat]() } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkBase.scala index f6b12886..1a405738 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkBase.scala @@ -97,6 +97,9 @@ abstract class NativeParquetSinkBase( inputRDD.isShuffleReadFull, (partition, context) => { + // mark for native parquet sink + ParquetSinkTaskContext.get.isNative = true + // init hadoop fs val resourceId = s"NativeParquetSinkExec:${UUID.randomUUID().toString}" JniBridge.resourcesMap.put(
