This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch dev-v6.0.0-parallel-scan-kdev-build
in repository https://gitbox.apache.org/repos/asf/auron.git

commit 0eaa812e107198689b79d8bd66cd30d5e0548b99
Author: Zhang Li <[email protected]>
AuthorDate: Thu Aug 7 17:02:38 2025 +0800

    fix execution error in non-native parquet sink tasks (#1123)
    
    Co-authored-by: zhangli20 <[email protected]>
---
 .../NativeParquetInsertIntoHiveTableExec.scala     | 33 ++++++++++++++++++++--
 .../NativeParquetInsertIntoHiveTableBase.scala     |  1 +
 .../blaze/plan/NativeParquetSinkBase.scala         |  3 ++
 3 files changed, 34 insertions(+), 3 deletions(-)

diff --git 
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala
 
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala
index ef31c270..fccb22d9 100644
--- 
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala
+++ 
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala
@@ -113,9 +113,17 @@ case class NativeParquetInsertIntoHiveTableExec(
       new BasicWriteJobStatsTracker(serializableHadoopConf, metrics) {
         override def newTaskInstance(): WriteTaskStatsTracker = {
           new BasicWriteTaskStatsTracker(serializableHadoopConf.value) {
-            override def newRow(_filePath: String, _row: InternalRow): Unit = 
{}
+            override def newRow(filePath: String, row: InternalRow): Unit = {
+              if (!ParquetSinkTaskContext.get.isNative) {
+                return super.newRow(filePath, row)
+              }
+            }
 
             override def closeFile(filePath: String): Unit = {
+              if (!ParquetSinkTaskContext.get.isNative) {
+                return super.closeFile(filePath)
+              }
+
               val outputFileStat = 
ParquetSinkTaskContext.get.processedOutputFiles.remove()
               for (_ <- 0L until outputFileStat.numRows) {
                 super.newRow(filePath, null)
@@ -147,12 +155,23 @@ case class NativeParquetInsertIntoHiveTableExec(
               mutable.ArrayBuffer.empty
 
             override def newPartition(partitionValues: InternalRow): Unit = {
+              if (!ParquetSinkTaskContext.get.isNative) {
+                return super.newPartition(partitionValues)
+              }
               partitions.append(partitionValues)
             }
 
-            override def newRow(_row: InternalRow): Unit = {}
+            override def newRow(row: InternalRow): Unit = {
+              if (!ParquetSinkTaskContext.get.isNative) {
+                return super.newRow(row)
+              }
+            }
 
             override def getFinalStats(): WriteTaskStats = {
+              if (!ParquetSinkTaskContext.get.isNative) {
+                return super.getFinalStats()
+              }
+
               val outputFileStat = 
ParquetSinkTaskContext.get.processedOutputFiles.remove()
               BasicWriteTaskStats(
                 partitions = partitions,
@@ -179,9 +198,17 @@ case class NativeParquetInsertIntoHiveTableExec(
       new BasicWriteJobStatsTracker(serializableHadoopConf, metrics) {
         override def newTaskInstance(): WriteTaskStatsTracker = {
           new BasicWriteTaskStatsTracker(serializableHadoopConf.value) {
-            override def newRow(_row: InternalRow): Unit = {}
+            override def newRow(row: InternalRow): Unit = {
+              if (!ParquetSinkTaskContext.get.isNative) {
+                return super.newRow(row)
+              }
+            }
 
             override def getFinalStats(): WriteTaskStats = {
+              if (!ParquetSinkTaskContext.get.isNative) {
+                return super.getFinalStats()
+              }
+
               val outputFileStat = 
ParquetSinkTaskContext.get.processedOutputFiles.remove()
               BasicWriteTaskStats(
                 numPartitions = 1,
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableBase.scala
index 8c4ee53d..cc8cd9bb 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableBase.scala
@@ -165,6 +165,7 @@ class BlazeMapredParquetOutputFormat
 case class OutputFileStat(path: String, numRows: Long, numBytes: Long)
 
 class ParquetSinkTaskContext {
+  var isNative: Boolean = false
   val processingOutputFiles = new LinkedBlockingDeque[String]()
   val processedOutputFiles = new util.ArrayDeque[OutputFileStat]()
 }
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkBase.scala
index f6b12886..1a405738 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkBase.scala
@@ -97,6 +97,9 @@ abstract class NativeParquetSinkBase(
       inputRDD.isShuffleReadFull,
       (partition, context) => {
 
+        // mark for native parquet sink
+        ParquetSinkTaskContext.get.isNative = true
+
         // init hadoop fs
         val resourceId = s"NativeParquetSinkExec:${UUID.randomUUID().toString}"
         JniBridge.resourcesMap.put(

Reply via email to