This is an automated email from the ASF dual-hosted git repository.

mingliang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 059845c41a [GLUTEN-8580][CORE][Part-1] Clean up unnecessary code 
related to input file expression (#8584)
059845c41a is described below

commit 059845c41ab8d2571b4b231e4e3812c94f2b2ad7
Author: Mingliang Zhu <[email protected]>
AuthorDate: Thu Jan 23 13:02:05 2025 +0800

    [GLUTEN-8580][CORE][Part-1] Clean up unnecessary code related to input file 
expression (#8584)
---
 .../backendsapi/clickhouse/CHIteratorApi.scala     | 13 +++++-----
 .../execution/GlutenWholeStageColumnarRDD.scala    | 16 +-----------
 .../sql/execution/InputFileBlockHolderProxy.scala  | 30 ----------------------
 3 files changed, 7 insertions(+), 52 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index b041b1b817..f6718be48d 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -235,25 +235,24 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
     val planByteArray = wsCtx.root.toProtobuf.toByteArray
     splitInfos.zipWithIndex.map {
       case (splits, index) =>
-        val (splitInfosByteArray, files) = splits.zipWithIndex.map {
+        val splitInfosByteArray = splits.zipWithIndex.map {
           case (split, i) =>
             split match {
               case filesNode: LocalFilesNode =>
                 setFileSchemaForLocalFiles(filesNode, scans(i))
-                (filesNode.toProtobuf.toByteArray, 
filesNode.getPaths.asScala.toSeq)
+                filesNode.toProtobuf.toByteArray
               case extensionTableNode: ExtensionTableNode =>
-                (extensionTableNode.toProtobuf.toByteArray, 
extensionTableNode.getPartList)
+                extensionTableNode.toProtobuf.toByteArray
               case kafkaSourceNode: StreamKafkaSourceNode =>
-                (kafkaSourceNode.toProtobuf.toByteArray, Seq.empty)
+                kafkaSourceNode.toProtobuf.toByteArray
             }
-        }.unzip
+        }
 
         GlutenPartition(
           index,
           planByteArray,
           splitInfosByteArray.toArray,
-          locations = splits.flatMap(_.preferredLocations().asScala).toArray,
-          files.flatten.toArray
+          locations = splits.flatMap(_.preferredLocations().asScala).toArray
         )
     }
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
index a54bbfad4e..757fa2459c 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
@@ -23,7 +23,6 @@ import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics}
 import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.execution.InputFileBlockHolderProxy
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -37,9 +36,7 @@ case class GlutenPartition(
     index: Int,
     plan: Array[Byte],
     splitInfosByteArray: Array[Array[Byte]] = Array.empty[Array[Byte]],
-    locations: Array[String] = Array.empty[String],
-    files: Array[String] =
-      Array.empty[String] // touched files, for implementing UDF 
input_file_names
+    locations: Array[String] = Array.empty[String]
 ) extends BaseGlutenPartition {
 
   override def preferredLocations(): Array[String] = locations
@@ -62,17 +59,6 @@ class GlutenWholeStageColumnarRDD(
   private val numaBindingInfo = GlutenConfig.get.numaBindingInfo
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[ColumnarBatch] = {
-
-    // To support input_file_name(). According to semantic we should return
-    // the exact file name a row belongs to. However in columnar engine it's
-    // not easy to accomplish this. so we return a list of file(part) names
-    split match {
-      case FirstZippedPartitionsPartition(_, g: GlutenPartition, _) =>
-        InputFileBlockHolderProxy.set(g.files.mkString(","))
-      case _ =>
-        InputFileBlockHolderProxy.unset()
-    }
-
     GlutenTimeMetric.millis(pipelineTime) {
       _ =>
         ExecutorManager.tryTaskSet(numaBindingInfo)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/InputFileBlockHolderProxy.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/InputFileBlockHolderProxy.scala
deleted file mode 100644
index eed321403c..0000000000
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/InputFileBlockHolderProxy.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.InputFileBlockHolder
-
-object InputFileBlockHolderProxy {
-  def set(files: String): Unit = {
-    InputFileBlockHolder.set(files, 0, 0)
-  }
-
-  def unset(): Unit = {
-    InputFileBlockHolder.unset()
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to