This is an automated email from the ASF dual-hosted git repository.
kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 5d1d0bab66 [GLUTEN-8330][VL] Improve convert the viewfs path to hdfs
path (#8331)
5d1d0bab66 is described below
commit 5d1d0bab66ecb974d0b3d635a6d1264c1803e150
Author: Yuming Wang <[email protected]>
AuthorDate: Thu Dec 26 08:57:03 2024 +0800
[GLUTEN-8330][VL] Improve convert the viewfs path to hdfs path (#8331)
---
.../gluten/backendsapi/velox/VeloxBackend.scala | 20 +++---
.../gluten/execution/WholeStageTransformer.scala | 24 ++-----
.../hadoop/fs/viewfs/ViewFileSystemUtils.scala | 78 ++++++++++++++++++++++
3 files changed, 93 insertions(+), 29 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 4d66c26913..daa398d370 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -49,8 +49,10 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.viewfs.ViewFileSystemUtils
+import scala.collection.mutable
import scala.util.control.Breaks.breakable
class VeloxBackend extends SubstraitBackend {
@@ -107,17 +109,11 @@ object VeloxBackendSettings extends BackendSettingsApi {
val filteredRootPaths = distinctRootPaths(rootPaths)
if (filteredRootPaths.nonEmpty) {
val resolvedPaths =
- if (
- GlutenConfig.getConf.enableHdfsViewfs &&
filteredRootPaths.head.startsWith("viewfs")
- ) {
- // Convert the viewfs path to hdfs path.
- filteredRootPaths.map {
- viewfsPath =>
- val viewPath = new Path(viewfsPath)
- val viewFileSystem =
- FileSystem.get(viewPath.toUri,
serializableHadoopConf.get.value)
- viewFileSystem.resolvePath(viewPath).toString
- }
+ if (GlutenConfig.getConf.enableHdfsViewfs) {
+ ViewFileSystemUtils.convertViewfsToHdfs(
+ filteredRootPaths,
+ mutable.Map.empty[String, String],
+ serializableHadoopConf.get.value)
} else {
filteredRootPaths
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index fc38d0c3ff..003efb5f9a 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -46,7 +46,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration
import com.google.common.collect.Lists
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.viewfs.ViewFileSystemUtils
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -377,26 +377,16 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
val allScanSplitInfos =
getSplitInfosFromPartitions(basicScanExecTransformers,
allScanPartitions)
if (GlutenConfig.getConf.enableHdfsViewfs) {
+ val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty
allScanSplitInfos.foreach {
splitInfos =>
splitInfos.foreach {
case splitInfo: LocalFilesNode =>
- val paths = splitInfo.getPaths.asScala
- if (paths.nonEmpty && paths.exists(_.startsWith("viewfs"))) {
- // Convert the viewfs path into hdfs
- val newPaths = paths.map {
- viewfsPath =>
- var finalPath = viewfsPath
- while (finalPath.startsWith("viewfs")) {
- val viewPath = new Path(finalPath)
- val viewFileSystem =
- FileSystem.get(viewPath.toUri,
serializableHadoopConf.value)
- finalPath =
viewFileSystem.resolvePath(viewPath).toString
- }
- finalPath
- }
- splitInfo.setPaths(newPaths.asJava)
- }
+ val newPaths = ViewFileSystemUtils.convertViewfsToHdfs(
+ splitInfo.getPaths.asScala.toSeq,
+ viewfsToHdfsCache,
+ serializableHadoopConf.value)
+ splitInfo.setPaths(newPaths.asJava)
}
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/hadoop/fs/viewfs/ViewFileSystemUtils.scala
b/gluten-substrait/src/main/scala/org/apache/hadoop/fs/viewfs/ViewFileSystemUtils.scala
new file mode 100644
index 0000000000..840f2b0772
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/hadoop/fs/viewfs/ViewFileSystemUtils.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import scala.collection.mutable
+
+object ViewFileSystemUtils {
+
+ /**
+ * Convert the viewfs path to hdfs path. Similar to
ViewFileSystem.resolvePath, but does not make
+ * RPC calls.
+ */
+ def convertViewfsToHdfs(f: String, hadoopConfig: Configuration): String = {
+ val path = new Path(f)
+ FileSystem.get(path.toUri, hadoopConfig) match {
+ case vfs: ViewFileSystem =>
+ val fsStateField = vfs.getClass.getDeclaredField("fsState")
+ fsStateField.setAccessible(true)
+ val fsState = fsStateField.get(vfs).asInstanceOf[InodeTree[FileSystem]]
+ val res = fsState.resolve(f, true)
+ if (res.isInternalDir) {
+ f
+ } else {
+ Path.mergePaths(new Path(res.targetFileSystem.getUri),
res.remainingPath).toString
+ }
+ case otherFileSystem =>
+ otherFileSystem.resolvePath(path).toString
+ }
+ }
+
+ /**
+ * Convert a sequence of viewfs path to a sequence of hdfs path.
+ * @param paths
+ * sequence of viewfs path
+ * @param viewfsToHdfsCache
+ * A map use to cache converted paths
+ * @param hadoopConfig
+ * Hadoop configuration
+ * @return
+ * sequence of hdfs path
+ */
+ def convertViewfsToHdfs(
+ paths: Seq[String],
+ viewfsToHdfsCache: mutable.Map[String, String],
+ hadoopConfig: Configuration): Seq[String] = {
+ paths.map {
+ path =>
+ if (path.startsWith("viewfs")) {
+ val pathSplit = path.split(Path.SEPARATOR)
+ val prefixIndex = pathSplit.size - 1
+ val pathPrefix = pathSplit.take(prefixIndex).mkString(Path.SEPARATOR)
+ val hdfsPath = viewfsToHdfsCache.getOrElseUpdate(
+ pathPrefix,
+ convertViewfsToHdfs(pathPrefix, hadoopConfig))
+ hdfsPath + Path.SEPARATOR +
pathSplit.drop(prefixIndex).mkString(Path.SEPARATOR)
+ } else {
+ path
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]