This is an automated email from the ASF dual-hosted git repository.
mingliang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 059845c41a [GLUTEN-8580][CORE][Part-1] Clean up unnecessary code
related to input file expression (#8584)
059845c41a is described below
commit 059845c41ab8d2571b4b231e4e3812c94f2b2ad7
Author: Mingliang Zhu <[email protected]>
AuthorDate: Thu Jan 23 13:02:05 2025 +0800
[GLUTEN-8580][CORE][Part-1] Clean up unnecessary code related to input file
expression (#8584)
---
.../backendsapi/clickhouse/CHIteratorApi.scala | 13 +++++-----
.../execution/GlutenWholeStageColumnarRDD.scala | 16 +-----------
.../sql/execution/InputFileBlockHolderProxy.scala | 30 ----------------------
3 files changed, 7 insertions(+), 52 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index b041b1b817..f6718be48d 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -235,25 +235,24 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
val planByteArray = wsCtx.root.toProtobuf.toByteArray
splitInfos.zipWithIndex.map {
case (splits, index) =>
- val (splitInfosByteArray, files) = splits.zipWithIndex.map {
+ val splitInfosByteArray = splits.zipWithIndex.map {
case (split, i) =>
split match {
case filesNode: LocalFilesNode =>
setFileSchemaForLocalFiles(filesNode, scans(i))
- (filesNode.toProtobuf.toByteArray,
filesNode.getPaths.asScala.toSeq)
+ filesNode.toProtobuf.toByteArray
case extensionTableNode: ExtensionTableNode =>
- (extensionTableNode.toProtobuf.toByteArray,
extensionTableNode.getPartList)
+ extensionTableNode.toProtobuf.toByteArray
case kafkaSourceNode: StreamKafkaSourceNode =>
- (kafkaSourceNode.toProtobuf.toByteArray, Seq.empty)
+ kafkaSourceNode.toProtobuf.toByteArray
}
- }.unzip
+ }
GlutenPartition(
index,
planByteArray,
splitInfosByteArray.toArray,
- locations = splits.flatMap(_.preferredLocations().asScala).toArray,
- files.flatten.toArray
+ locations = splits.flatMap(_.preferredLocations().asScala).toArray
)
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
index a54bbfad4e..757fa2459c 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
@@ -23,7 +23,6 @@ import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics}
import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.execution.InputFileBlockHolderProxy
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -37,9 +36,7 @@ case class GlutenPartition(
index: Int,
plan: Array[Byte],
splitInfosByteArray: Array[Array[Byte]] = Array.empty[Array[Byte]],
- locations: Array[String] = Array.empty[String],
- files: Array[String] =
- Array.empty[String] // touched files, for implementing UDF
input_file_names
+ locations: Array[String] = Array.empty[String]
) extends BaseGlutenPartition {
override def preferredLocations(): Array[String] = locations
@@ -62,17 +59,6 @@ class GlutenWholeStageColumnarRDD(
private val numaBindingInfo = GlutenConfig.get.numaBindingInfo
override def compute(split: Partition, context: TaskContext):
Iterator[ColumnarBatch] = {
-
- // To support input_file_name(). According to semantic we should return
- // the exact file name a row belongs to. However in columnar engine it's
- // not easy to accomplish this. so we return a list of file(part) names
- split match {
- case FirstZippedPartitionsPartition(_, g: GlutenPartition, _) =>
- InputFileBlockHolderProxy.set(g.files.mkString(","))
- case _ =>
- InputFileBlockHolderProxy.unset()
- }
-
GlutenTimeMetric.millis(pipelineTime) {
_ =>
ExecutorManager.tryTaskSet(numaBindingInfo)
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/InputFileBlockHolderProxy.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/InputFileBlockHolderProxy.scala
deleted file mode 100644
index eed321403c..0000000000
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/InputFileBlockHolderProxy.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.InputFileBlockHolder
-
-object InputFileBlockHolderProxy {
- def set(files: String): Unit = {
- InputFileBlockHolder.set(files, 0, 0)
- }
-
- def unset(): Unit = {
- InputFileBlockHolder.unset()
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]