This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c1ab7b38c7 [GLUTEN-5103][VL] Use jvm libhdfs replace c++ libhdfs3
c1ab7b38c7 is described below

commit c1ab7b38c7a0dce339eb25131d3916dbd98fd3d0
Author: JiaKe <[email protected]>
AuthorDate: Fri Oct 25 18:22:38 2024 +0800

    [GLUTEN-5103][VL] Use jvm libhdfs replace c++ libhdfs3
    
    This patch is merged together with the Velox daily rebase update 
(2024_10_25)
    
    New commits from Velox daily rebase:
    3fa7acf3c by Xiaoxuan Meng, Avoid local shuffle and global shuffle use the 
same hash value to avoid data skew (11338)
    19ea693cc by lingbin, Use 'if constexpr' for Buffer::is_pod_like_v<T> 
(11341)
    c5dc89d5a by Jenson, Fix nullable bug of Arrow MapVector in Bridge.cpp 
(11214)
    c14040f2c by Ke, Report inputSizeInBytes in HiveDataSink (11339)
    10cdf6fd8 by Jia Ke, Support jvm version libhdfs in velox (9835)
    8d77bebbe by Ke, Add single parameter (array<array<T>>) support for 
array_intersect (11305)
    3922aec50 by Xiaoxuan Meng, Fix local file sink to use velox fs as well as 
fail on exist file (11322)
    97aa713cc by Jimmy Lu, Make RowType::nameOf throw Velox exception (11336)
    14a74ebde by Duc Nguyen, Move ApproxPercentile intermediate type index to 
header file (11224)
---
 .../backendsapi/clickhouse/CHIteratorApi.scala     |  4 +++-
 .../GlutenClickHouseMergeTreeWriteOnS3Suite.scala  |  2 +-
 .../GlutenClickHouseMergeTreeWriteSuite.scala      |  4 ++--
 .../backendsapi/velox/VeloxIteratorApi.scala       | 20 ++++++++++++----
 .../gluten/utils/SharedLibraryLoaderCentos7.scala  |  2 +-
 .../gluten/utils/SharedLibraryLoaderCentos8.scala  |  2 +-
 .../gluten/utils/SharedLibraryLoaderCentos9.scala  |  2 +-
 .../gluten/utils/SharedLibraryLoaderDebian11.scala |  2 +-
 .../gluten/utils/SharedLibraryLoaderDebian12.scala |  2 +-
 .../utils/SharedLibraryLoaderUbuntu2004.scala      |  2 +-
 .../utils/SharedLibraryLoaderUbuntu2204.scala      |  2 +-
 cpp/velox/CMakeLists.txt                           | 28 ----------------------
 cpp/velox/compute/WholeStageResultIterator.cc      |  7 ------
 cpp/velox/utils/HdfsUtils.h                        | 22 -----------------
 ep/build-velox/src/get_velox.sh                    |  2 +-
 ep/build-velox/src/modify_velox.patch              | 22 +----------------
 .../gluten/execution/IcebergScanTransformer.scala  |  5 +++-
 .../gluten/execution/VeloxIcebergSuite.scala       |  6 ++---
 .../apache/gluten/backendsapi/IteratorApi.scala    |  4 +++-
 .../execution/BasicScanExecTransformer.scala       | 12 ++++++----
 .../gluten/execution/WholeStageTransformer.scala   | 15 +++++++++---
 21 files changed, 60 insertions(+), 107 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 6760d29561..2fa2e4402b 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.SerializableConfiguration
 
 import java.lang.{Long => JLong}
 import java.net.URI
@@ -133,7 +134,8 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
       partitionSchema: StructType,
       fileFormat: ReadFileFormat,
       metadataColumnNames: Seq[String],
-      properties: Map[String, String]): SplitInfo = {
+      properties: Map[String, String],
+      serializableHadoopConf: SerializableConfiguration): SplitInfo = {
     partition match {
       case p: GlutenMergeTreePartition =>
         ExtensionTableBuilder
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index dfdf02d455..af67b01f49 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -771,7 +771,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
             case scanExec: BasicScanExecTransformer => scanExec
           }
           assertResult(1)(plans.size)
-          assertResult(1)(plans.head.getSplitInfos.size)
+          assertResult(1)(plans.head.getSplitInfos(null).size)
       }
     }
   }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
index 186078c18d..0959832949 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -1801,7 +1801,7 @@ class GlutenClickHouseMergeTreeWriteSuite
                 case scanExec: BasicScanExecTransformer => scanExec
               }
               assertResult(1)(plans.size)
-              assertResult(conf._2)(plans.head.getSplitInfos.size)
+              assertResult(conf._2)(plans.head.getSplitInfos(null).size)
           }
         }
       })
@@ -1910,7 +1910,7 @@ class GlutenClickHouseMergeTreeWriteSuite
                 case f: BasicScanExecTransformer => f
               }
               assertResult(2)(scanExec.size)
-              assertResult(conf._2)(scanExec(1).getSplitInfos.size)
+              assertResult(conf._2)(scanExec(1).getSplitInfos(null).size)
           }
         }
       })
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 061daaac0f..320d1f366c 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -39,7 +39,9 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.{ExecutorManager, SparkDirectoryUtil}
+import org.apache.spark.util.{ExecutorManager, SerializableConfiguration, 
SparkDirectoryUtil}
+
+import org.apache.hadoop.fs.{FileSystem, Path}
 
 import java.lang.{Long => JLong}
 import java.nio.charset.StandardCharsets
@@ -55,7 +57,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       partitionSchema: StructType,
       fileFormat: ReadFileFormat,
       metadataColumnNames: Seq[String],
-      properties: Map[String, String]): SplitInfo = {
+      properties: Map[String, String],
+      serializableHadoopConf: SerializableConfiguration): SplitInfo = {
     partition match {
       case f: FilePartition =>
         val (
@@ -66,7 +69,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
           modificationTimes,
           partitionColumns,
           metadataColumns) =
-          constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
+          constructSplitInfo(partitionSchema, f.files, metadataColumnNames, 
serializableHadoopConf)
         val preferredLocations =
           SoftAffinity.getFilePartitionLocations(f)
         LocalFilesBuilder.makeLocalFiles(
@@ -109,7 +112,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
   private def constructSplitInfo(
       schema: StructType,
       files: Array[PartitionedFile],
-      metadataColumnNames: Seq[String]) = {
+      metadataColumnNames: Seq[String],
+      serializableHadoopConf: SerializableConfiguration) = {
     val paths = new JArrayList[String]()
     val starts = new JArrayList[JLong]
     val lengths = new JArrayList[JLong]()
@@ -121,9 +125,15 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       file =>
         // The "file.filePath" in PartitionedFile is not the original encoded 
path, so the decoded
         // path is incorrect in some cases and here fix the case of ' ' by 
using GlutenURLDecoder
+        var filePath = file.filePath.toString
+        if (filePath.startsWith("viewfs")) {
+          val viewPath = new Path(filePath)
+          val viewFileSystem = FileSystem.get(viewPath.toUri, 
serializableHadoopConf.value)
+          filePath = viewFileSystem.resolvePath(viewPath).toString
+        }
         paths.add(
           GlutenURLDecoder
-            .decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
+            .decode(filePath, StandardCharsets.UTF_8.name()))
         starts.add(JLong.valueOf(file.start))
         lengths.add(JLong.valueOf(file.length))
         val (fileSize, modificationTime) =
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala
index a5e638d9be..16cd18e41a 100755
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala
@@ -37,7 +37,7 @@ class SharedLibraryLoaderCentos7 extends SharedLibraryLoader {
     loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
     loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
     loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
-    loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
+    loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
     loader.loadAndCreateLink("libre2.so.10", "libre2.so", false)
     loader.loadAndCreateLink("libzstd.so.1", "libzstd.so", false)
     loader.loadAndCreateLink("liblz4.so.1", "liblz4.so", false)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala
index 5d8c18b8bb..0a75c30c22 100755
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala
@@ -42,7 +42,7 @@ class SharedLibraryLoaderCentos8 extends SharedLibraryLoader {
     loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
     loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
     loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
-    loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
+    loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
     loader.loadAndCreateLink("libre2.so.0", "libre2.so", false)
     loader.loadAndCreateLink("libsodium.so.23", "libsodium.so", false)
   }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala
index 694cf4c622..50f9fe4aaa 100755
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala
@@ -42,7 +42,7 @@ class SharedLibraryLoaderCentos9 extends SharedLibraryLoader {
     loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
     loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
     loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
-    loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
+    loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
     loader.loadAndCreateLink("libre2.so.9", "libre2.so", false)
     loader.loadAndCreateLink("libsodium.so.23", "libsodium.so", false)
   }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala
index 6927f2539e..06c065ba28 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala
@@ -44,6 +44,6 @@ class SharedLibraryLoaderDebian11 extends SharedLibraryLoader 
{
     loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
     loader.loadAndCreateLink("libcurl.so.4", "libcurl.so", false)
     loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
-    loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
+    loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala
index ce01f4399f..8018995328 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala
@@ -50,6 +50,6 @@ class SharedLibraryLoaderDebian12 extends SharedLibraryLoader 
{
     loader.loadAndCreateLink("libevent-2.1.so.7", "libevent-2.1.so", false)
     loader.loadAndCreateLink("libcurl.so.4", "libcurl.so", false)
     loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
-    loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
+    loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala
index 79c0518ea3..d1f21a0013 100755
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala
@@ -57,7 +57,7 @@ class SharedLibraryLoaderUbuntu2004 extends 
SharedLibraryLoader {
     loader.loadAndCreateLink("libicudata.so.66", "libicudata.so", false)
     loader.loadAndCreateLink("libicuuc.so.66", "libicuuc.so", false)
     loader.loadAndCreateLink("libxml2.so.2", "libxml2.so", false)
-    loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
+    loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
     loader.loadAndCreateLink("libre2.so.5", "libre2.so", false)
     loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
     loader.loadAndCreateLink("libthrift-0.13.0.so", "libthrift.so", false)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala
index a5d99ede42..3cf4d30237 100755
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala
@@ -42,7 +42,7 @@ class SharedLibraryLoaderUbuntu2204 extends 
SharedLibraryLoader {
     loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
     loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
     loader.loadAndCreateLink("libxml2.so.2", "libxml2.so", false)
-    loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
+    loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
     loader.loadAndCreateLink("libre2.so.9", "libre2.so", false)
     loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
     loader.loadAndCreateLink("libthrift-0.16.0.so", "libthrift.so", false)
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 7eaf5f1e61..47e4ff8f13 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -109,28 +109,6 @@ macro(add_duckdb)
   endif()
 endmacro()
 
-macro(find_libhdfs3)
-  find_package(libhdfs3 CONFIG)
-  if(libhdfs3_FOUND AND TARGET HDFS::hdfs3)
-    set(LIBHDFS3_LIBRARY HDFS::hdfs3)
-  else()
-    find_path(libhdfs3_INCLUDE_DIR hdfs/hdfs.h)
-    set(CMAKE_FIND_LIBRARY_SUFFIXES ".so")
-    find_library(libhdfs3_LIBRARY NAMES hdfs3)
-    find_package_handle_standard_args(libhdfs3 DEFAULT_MSG libhdfs3_INCLUDE_DIR
-                                      libhdfs3_LIBRARY)
-    add_library(HDFS::hdfs3 SHARED IMPORTED)
-    set_target_properties(
-      HDFS::hdfs3
-      PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${libhdfs3_INCLUDE_DIR}"
-                 IMPORTED_LOCATION "${libhdfs3_LIBRARY}")
-  endif()
-
-  if(NOT libhdfs3_FOUND)
-    message(FATAL_ERROR "LIBHDFS3 Library Not Found")
-  endif()
-endmacro()
-
 macro(find_re2)
   find_package(re2 CONFIG)
   if(re2_FOUND AND TARGET re2::re2)
@@ -209,10 +187,6 @@ set(VELOX_SRCS
     utils/Common.cc
     utils/VeloxBatchResizer.cc)
 
-if(ENABLE_HDFS)
-  list(APPEND VELOX_SRCS utils/HdfsUtils.cc)
-endif()
-
 if(ENABLE_S3)
   find_package(ZLIB)
 endif()
@@ -336,8 +310,6 @@ endif()
 
 if(ENABLE_HDFS)
   add_definitions(-DENABLE_HDFS)
-  find_libhdfs3()
-  target_link_libraries(velox PUBLIC HDFS::hdfs3)
 endif()
 
 if(ENABLE_S3)
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index ea5bd59794..0e1a9bed7b 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -22,10 +22,6 @@
 #include "velox/connectors/hive/HiveConnectorSplit.h"
 #include "velox/exec/PlanNodeStats.h"
 
-#ifdef ENABLE_HDFS
-#include "utils/HdfsUtils.h"
-#endif
-
 using namespace facebook;
 
 namespace gluten {
@@ -70,9 +66,6 @@ WholeStageResultIterator::WholeStageResultIterator(
       scanNodeIds_(scanNodeIds),
       scanInfos_(scanInfos),
       streamIds_(streamIds) {
-#ifdef ENABLE_HDFS
-  gluten::updateHdfsTokens(veloxCfg_.get());
-#endif
   spillStrategy_ = veloxCfg_->get<std::string>(kSpillStrategy, 
kSpillStrategyDefaultValue);
   auto spillThreadNum = veloxCfg_->get<uint32_t>(kSpillThreadNum, 
kSpillThreadNumDefaultValue);
   if (spillThreadNum > 0) {
diff --git a/cpp/velox/utils/HdfsUtils.h b/cpp/velox/utils/HdfsUtils.h
deleted file mode 100644
index 2e07d7ddf4..0000000000
--- a/cpp/velox/utils/HdfsUtils.h
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <velox/common/config/Config.h>
-#include <memory>
-namespace gluten {
-void updateHdfsTokens(const facebook::velox::config::ConfigBase* veloxCfg);
-}
diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh
index 87d425b5d0..3886d6f2b7 100755
--- a/ep/build-velox/src/get_velox.sh
+++ b/ep/build-velox/src/get_velox.sh
@@ -17,7 +17,7 @@
 set -exu
 
 VELOX_REPO=https://github.com/oap-project/velox.git
-VELOX_BRANCH=2024_10_24
+VELOX_BRANCH=2024_10_25
 VELOX_HOME=""
 
 OS=`uname -s`
diff --git a/ep/build-velox/src/modify_velox.patch 
b/ep/build-velox/src/modify_velox.patch
index 5c1aab2489..3d45ff4b48 100644
--- a/ep/build-velox/src/modify_velox.patch
+++ b/ep/build-velox/src/modify_velox.patch
@@ -74,27 +74,7 @@ diff --git a/CMakeLists.txt b/CMakeLists.txt
 index 7f7cbc92f..52adb1250 100644
 --- a/CMakeLists.txt
 +++ b/CMakeLists.txt
-@@ -242,10 +242,15 @@ if(VELOX_ENABLE_ABFS)
- endif()
- 
- if(VELOX_ENABLE_HDFS)
--  find_library(
--    LIBHDFS3
--    NAMES libhdfs3.so libhdfs3.dylib
--    HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
-+  find_package(libhdfs3)
-+  if(libhdfs3_FOUND AND TARGET HDFS::hdfs3)
-+    set(LIBHDFS3 HDFS::hdfs3)
-+  else()
-+    find_library(
-+      LIBHDFS3
-+      NAMES libhdfs3.so libhdfs3.dylib
-+      HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
-+  endif()
-   add_definitions(-DVELOX_ENABLE_HDFS3)
- endif()
- 
-@@ -385,7 +390,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS 
${BOOST_INCLUDE_LIBRARIES})
+@@ -386,7 +391,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS 
${BOOST_INCLUDE_LIBRARIES})
  # for reference. find_package(range-v3)
  
  set_source(gflags)
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 1cbeb52a92..10d24c317c 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.read.{InputPartition, Scan}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
 
 import org.apache.iceberg.spark.source.GlutenIcebergSourceUtil
 
@@ -58,7 +59,9 @@ case class IcebergScanTransformer(
 
   override lazy val fileFormat: ReadFileFormat = 
GlutenIcebergSourceUtil.getFileFormat(scan)
 
-  override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): 
Seq[SplitInfo] = {
+  override def getSplitInfosFromPartitions(
+      partitions: Seq[InputPartition],
+      serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
     val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions(
       scan,
       keyGroupedPartitioning,
diff --git 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
index bb604f534f..5ebf8883c6 100644
--- 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
+++ 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
@@ -128,7 +128,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
                 case plan if plan.isInstanceOf[IcebergScanTransformer] =>
                   assert(
                     
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
+                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 
3)
                 case _ => // do nothing
               }
               checkLengthAndPlan(df, 7)
@@ -208,7 +208,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
                 case plan if plan.isInstanceOf[IcebergScanTransformer] =>
                   assert(
                     
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
+                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 
3)
                 case _ => // do nothing
               }
               checkLengthAndPlan(df, 7)
@@ -289,7 +289,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
                 case plan if plan.isInstanceOf[IcebergScanTransformer] =>
                   assert(
                     
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1)
+                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 
1)
                 case _ => // do nothing
               }
               checkLengthAndPlan(df, 5)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 11211bd0da..69c9d37334 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.SerializableConfiguration
 
 trait IteratorApi {
 
@@ -37,7 +38,8 @@ trait IteratorApi {
       partitionSchema: StructType,
       fileFormat: ReadFileFormat,
       metadataColumnNames: Seq[String],
-      properties: Map[String, String]): SplitInfo
+      properties: Map[String, String],
+      serializableHadoopConf: SerializableConfiguration): SplitInfo
 
   /** Generate native row partition. */
   def genPartitions(
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 912b93079f..d7b824b397 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.hive.HiveTableScanExecTransformer
 import org.apache.spark.sql.types.{BooleanType, StringType, StructField, 
StructType}
+import org.apache.spark.util.SerializableConfiguration
 
 import com.google.protobuf.StringValue
 import io.substrait.proto.NamedStruct
@@ -62,11 +63,13 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
   def getProperties: Map[String, String] = Map.empty
 
   /** Returns the split infos that will be processed by the underlying native 
engine. */
-  def getSplitInfos: Seq[SplitInfo] = {
-    getSplitInfosFromPartitions(getPartitions)
+  def getSplitInfos(serializableHadoopConf: SerializableConfiguration): 
Seq[SplitInfo] = {
+    getSplitInfosFromPartitions(getPartitions, serializableHadoopConf)
   }
 
-  def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): 
Seq[SplitInfo] = {
+  def getSplitInfosFromPartitions(
+      partitions: Seq[InputPartition],
+      serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
     partitions.map(
       BackendsApiManager.getIteratorApiInstance
         .genSplitInfo(
@@ -74,7 +77,8 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
           getPartitionSchema,
           fileFormat,
           getMetadataColumns.map(_.name),
-          getProperties))
+          getProperties,
+          serializableHadoopConf))
   }
 
   override protected def doValidateInternal(): ValidationResult = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 7bd84e09d1..e1dfd3f570 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -40,6 +40,7 @@ import 
org.apache.spark.sql.execution.datasources.FilePartition
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.SerializableConfiguration
 
 import com.google.common.collect.Lists
 
@@ -129,6 +130,8 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
     
BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext)
 
   val sparkConf: SparkConf = sparkContext.getConf
+  val serializableHadoopConf: SerializableConfiguration = new 
SerializableConfiguration(
+    sparkContext.hadoopConfiguration)
   val numaBindingInfo: GlutenNumaBindingInfo = 
GlutenConfig.getConf.numaBindingInfo
 
   @transient
@@ -289,12 +292,16 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
        */
       val allScanPartitions = basicScanExecTransformers.map(_.getPartitions)
       val allScanSplitInfos =
-        getSplitInfosFromPartitions(basicScanExecTransformers, 
allScanPartitions)
+        getSplitInfosFromPartitions(
+          basicScanExecTransformers,
+          allScanPartitions,
+          serializableHadoopConf)
       val inputPartitions =
         BackendsApiManager.getIteratorApiInstance.genPartitions(
           wsCtx,
           allScanSplitInfos,
           basicScanExecTransformers)
+
       val rdd = new GlutenWholeStageColumnarRDD(
         sparkContext,
         inputPartitions,
@@ -380,7 +387,8 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
 
   private def getSplitInfosFromPartitions(
       basicScanExecTransformers: Seq[BasicScanExecTransformer],
-      allScanPartitions: Seq[Seq[InputPartition]]): Seq[Seq[SplitInfo]] = {
+      allScanPartitions: Seq[Seq[InputPartition]],
+      serializableHadoopConf: SerializableConfiguration): Seq[Seq[SplitInfo]] 
= {
     // If these are two scan transformers, they must have same partitions,
     // otherwise, exchange will be inserted. We should combine the two scan
     // transformers' partitions with same index, and set them together in
@@ -398,7 +406,8 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
     //  p1n  |  p2n    => substraitContext.setSplitInfo([p1n, p2n])
     val allScanSplitInfos =
       allScanPartitions.zip(basicScanExecTransformers).map {
-        case (partition, transformer) => 
transformer.getSplitInfosFromPartitions(partition)
+        case (partition, transformer) =>
+          transformer.getSplitInfosFromPartitions(partition, 
serializableHadoopConf)
       }
     val partitionLength = allScanSplitInfos.head.size
     if (allScanSplitInfos.exists(_.size != partitionLength)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to