This is an automated email from the ASF dual-hosted git repository.

kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d1d0bab66 [GLUTEN-8330][VL] Improve convert the viewfs path to hdfs 
path (#8331)
5d1d0bab66 is described below

commit 5d1d0bab66ecb974d0b3d635a6d1264c1803e150
Author: Yuming Wang <[email protected]>
AuthorDate: Thu Dec 26 08:57:03 2024 +0800

    [GLUTEN-8330][VL] Improve convert the viewfs path to hdfs path (#8331)
---
 .../gluten/backendsapi/velox/VeloxBackend.scala    | 20 +++---
 .../gluten/execution/WholeStageTransformer.scala   | 24 ++-----
 .../hadoop/fs/viewfs/ViewFileSystemUtils.scala     | 78 ++++++++++++++++++++++
 3 files changed, 93 insertions(+), 29 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 4d66c26913..daa398d370 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -49,8 +49,10 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
 
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.viewfs.ViewFileSystemUtils
 
+import scala.collection.mutable
 import scala.util.control.Breaks.breakable
 
 class VeloxBackend extends SubstraitBackend {
@@ -107,17 +109,11 @@ object VeloxBackendSettings extends BackendSettingsApi {
       val filteredRootPaths = distinctRootPaths(rootPaths)
       if (filteredRootPaths.nonEmpty) {
         val resolvedPaths =
-          if (
-            GlutenConfig.getConf.enableHdfsViewfs && 
filteredRootPaths.head.startsWith("viewfs")
-          ) {
-            // Convert the viewfs path to hdfs path.
-            filteredRootPaths.map {
-              viewfsPath =>
-                val viewPath = new Path(viewfsPath)
-                val viewFileSystem =
-                  FileSystem.get(viewPath.toUri, 
serializableHadoopConf.get.value)
-                viewFileSystem.resolvePath(viewPath).toString
-            }
+          if (GlutenConfig.getConf.enableHdfsViewfs) {
+            ViewFileSystemUtils.convertViewfsToHdfs(
+              filteredRootPaths,
+              mutable.Map.empty[String, String],
+              serializableHadoopConf.get.value)
           } else {
             filteredRootPaths
           }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index fc38d0c3ff..003efb5f9a 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -46,7 +46,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.SerializableConfiguration
 
 import com.google.common.collect.Lists
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.viewfs.ViewFileSystemUtils
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -377,26 +377,16 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
       val allScanSplitInfos =
         getSplitInfosFromPartitions(basicScanExecTransformers, 
allScanPartitions)
       if (GlutenConfig.getConf.enableHdfsViewfs) {
+        val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty
         allScanSplitInfos.foreach {
           splitInfos =>
             splitInfos.foreach {
               case splitInfo: LocalFilesNode =>
-                val paths = splitInfo.getPaths.asScala
-                if (paths.nonEmpty && paths.exists(_.startsWith("viewfs"))) {
-                  // Convert the viewfs path into hdfs
-                  val newPaths = paths.map {
-                    viewfsPath =>
-                      var finalPath = viewfsPath
-                      while (finalPath.startsWith("viewfs")) {
-                        val viewPath = new Path(finalPath)
-                        val viewFileSystem =
-                          FileSystem.get(viewPath.toUri, 
serializableHadoopConf.value)
-                        finalPath = 
viewFileSystem.resolvePath(viewPath).toString
-                      }
-                      finalPath
-                  }
-                  splitInfo.setPaths(newPaths.asJava)
-                }
+                val newPaths = ViewFileSystemUtils.convertViewfsToHdfs(
+                  splitInfo.getPaths.asScala.toSeq,
+                  viewfsToHdfsCache,
+                  serializableHadoopConf.value)
+                splitInfo.setPaths(newPaths.asJava)
             }
         }
       }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/hadoop/fs/viewfs/ViewFileSystemUtils.scala
 
b/gluten-substrait/src/main/scala/org/apache/hadoop/fs/viewfs/ViewFileSystemUtils.scala
new file mode 100644
index 0000000000..840f2b0772
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/hadoop/fs/viewfs/ViewFileSystemUtils.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import scala.collection.mutable
+
+object ViewFileSystemUtils {
+
+  /**
+   * Convert the viewfs path to hdfs path. Similar to 
ViewFileSystem.resolvePath, but does not make
+   * RPC calls.
+   */
+  def convertViewfsToHdfs(f: String, hadoopConfig: Configuration): String = {
+    val path = new Path(f)
+    FileSystem.get(path.toUri, hadoopConfig) match {
+      case vfs: ViewFileSystem =>
+        val fsStateField = vfs.getClass.getDeclaredField("fsState")
+        fsStateField.setAccessible(true)
+        val fsState = fsStateField.get(vfs).asInstanceOf[InodeTree[FileSystem]]
+        val res = fsState.resolve(f, true)
+        if (res.isInternalDir) {
+          f
+        } else {
+          Path.mergePaths(new Path(res.targetFileSystem.getUri), 
res.remainingPath).toString
+        }
+      case otherFileSystem =>
+        otherFileSystem.resolvePath(path).toString
+    }
+  }
+
+  /**
+   * Convert a sequence of viewfs path to a sequence of hdfs path.
+   * @param paths
+   *   sequence of viewfs path
+   * @param viewfsToHdfsCache
+   *   A map use to cache converted paths
+   * @param hadoopConfig
+   *   Hadoop configuration
+   * @return
+   *   sequence of hdfs path
+   */
+  def convertViewfsToHdfs(
+      paths: Seq[String],
+      viewfsToHdfsCache: mutable.Map[String, String],
+      hadoopConfig: Configuration): Seq[String] = {
+    paths.map {
+      path =>
+        if (path.startsWith("viewfs")) {
+          val pathSplit = path.split(Path.SEPARATOR)
+          val prefixIndex = pathSplit.size - 1
+          val pathPrefix = pathSplit.take(prefixIndex).mkString(Path.SEPARATOR)
+          val hdfsPath = viewfsToHdfsCache.getOrElseUpdate(
+            pathPrefix,
+            convertViewfsToHdfs(pathPrefix, hadoopConfig))
+          hdfsPath + Path.SEPARATOR + 
pathSplit.drop(prefixIndex).mkString(Path.SEPARATOR)
+        } else {
+          path
+        }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to