This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new d840259ff19b fix: Support Databricks Spark runtime (#18256)
d840259ff19b is described below
commit d840259ff19b122cd779d6397067098acc6fd9b4
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Feb 27 11:58:02 2026 -0800
fix: Support Databricks Spark runtime (#18256)
---
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 17 ++++++++++
.../hudi/common/util/TestReflectionUtils.java | 15 +++++++++
.../hudi/hadoop/HiveHoodieTableFileIndex.java | 18 -----------
.../apache/hudi/common/util/ReflectionUtils.java | 16 ++++++++++
.../apache/hudi/SparkHoodieTableFileIndex.scala | 37 +++++++++++++++-------
5 files changed, 74 insertions(+), 29 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 636eb1faa408..06ca98a206e1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -554,4 +554,21 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
void invalidate();
}
+
+ public static class NoopCache implements FileStatusCache {
+ @Override
+ public Option<List<StoragePathInfo>> get(StoragePath path) {
+ return Option.empty();
+ }
+
+ @Override
+ public void put(StoragePath path, List<StoragePathInfo> leafFiles) {
+ // no-op
+ }
+
+ @Override
+ public void invalidate() {
+ // no-op
+ }
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestReflectionUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestReflectionUtils.java
index 15decc3e5c57..01042c139133 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestReflectionUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestReflectionUtils.java
@@ -22,9 +22,13 @@ package org.apache.hudi.common.util;
import
org.apache.hudi.common.conflict.detection.DirectMarkerBasedDetectionStrategy;
import
org.apache.hudi.common.conflict.detection.EarlyConflictDetectionStrategy;
import
org.apache.hudi.common.conflict.detection.TimelineServerBasedDetectionStrategy;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathFilter;
import org.junit.jupiter.api.Test;
+import static org.apache.hudi.common.util.ReflectionUtils.getMethod;
import static org.apache.hudi.common.util.ReflectionUtils.isSubClass;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -42,4 +46,15 @@ public class TestReflectionUtils {
assertTrue(isSubClass(subClassName2,
TimelineServerBasedDetectionStrategy.class));
assertFalse(isSubClass(subClassName2,
DirectMarkerBasedDetectionStrategy.class));
}
+
+ @Test
+ void testGetMethod() {
+ assertTrue(getMethod(HoodieStorage.class, "getScheme").isPresent());
+ assertTrue(getMethod(HoodieStorage.class, "listFiles",
StoragePath.class).isPresent());
+ assertTrue(getMethod(HoodieStorage.class,
+ "listDirectEntries", StoragePath.class,
StoragePathFilter.class).isPresent());
+ assertFalse(getMethod(HoodieStorage.class,
+ "listDirectEntries", StoragePathFilter.class).isPresent());
+ assertFalse(getMethod(HoodieStorage.class,
"nonExistentMethod").isPresent());
+ }
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
index 8e446f78681f..70e8d7619dbf 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieTableQueryType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.storage.StoragePath;
import org.slf4j.Logger;
@@ -81,21 +80,4 @@ public class HiveHoodieTableFileIndex extends
BaseHoodieTableFileIndex {
// fetched by the query engine)
return new Object[0];
}
-
- static class NoopCache implements FileStatusCache {
- @Override
- public Option<List<StoragePathInfo>> get(StoragePath path) {
- return Option.empty();
- }
-
- @Override
- public void put(StoragePath path, List<StoragePathInfo> leafFiles) {
- // no-op
- }
-
- @Override
- public void invalidate() {
- // no-op
- }
- }
}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
b/hudi-io/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
index 21d91a8a3344..1d14ee5f6365 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
@@ -207,6 +207,22 @@ public class ReflectionUtils {
}
}
+ /**
+ * Gets a method based on the method name and type of parameters through
reflection.
+ *
+ * @param clazz {@link Class} object
+ * @param methodName method name
+ * @param parametersType type of parameters
+ * @return {@link Option} of the method if found; {@code Option.empty()} if
not found or error out
+ */
+ public static Option<Method> getMethod(Class<?> clazz, String methodName,
Class<?>... parametersType) {
+ try {
+ return Option.of(clazz.getMethod(methodName, parametersType));
+ } catch (Throwable e) {
+ return Option.empty();
+ }
+ }
+
/**
* Checks if the given class with the name is a subclass of another class.
*
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 68b70687cfba..306e5aa457be 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.TypedProperties
import
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.hadoop.fs.HadoopFSUtils
@@ -45,7 +46,9 @@ import org.apache.spark.sql.catalyst.{InternalRow,
expressions}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ByteType, DateType, IntegerType, LongType,
ShortType, StringType, StructField, StructType}
+import org.slf4j.LoggerFactory
+import java.lang.reflect.{Array => JArray}
import java.util.Collections
import javax.annotation.concurrent.NotThreadSafe
@@ -410,6 +413,8 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
}
object SparkHoodieTableFileIndex extends SparkAdapterSupport {
+ private val LOG = LoggerFactory.getLogger(classOf[SparkHoodieTableFileIndex])
+ private val PUT_LEAF_FILES_METHOD_NAME = "putLeafFiles"
private def haveProperPartitionValues(partitionPaths: Seq[PartitionPath]) = {
partitionPaths.forall(_.values.length > 0)
@@ -491,17 +496,27 @@ object SparkHoodieTableFileIndex extends
SparkAdapterSupport {
}
private def adapt(cache: FileStatusCache):
BaseHoodieTableFileIndex.FileStatusCache = {
- new BaseHoodieTableFileIndex.FileStatusCache {
- override def get(path: StoragePath):
org.apache.hudi.common.util.Option[java.util.List[StoragePathInfo]] =
- toJavaOption(cache.getLeafFiles(new Path(path.toUri)).map(opt =>
opt.map(
- e => HadoopFSUtils.convertToStoragePathInfo(e)).toList.asJava
- ))
-
- override def put(path: StoragePath, leafFiles:
java.util.List[StoragePathInfo]): Unit =
- cache.putLeafFiles(new Path(path.toUri), leafFiles.asScala.map(e =>
new FileStatus(
- e.getLength, e.isDirectory, 0, e.getBlockSize,
e.getModificationTime, new Path(e.getPath.toUri))).toArray)
-
- override def invalidate(): Unit = cache.invalidateAll()
+ // Certain Spark runtime like Databricks Spark has changed the
FileStatusCache APIs
+ // so we need to check the API to avoid NoSuchMethodError
+ if (ReflectionUtils.getMethod(
+ classOf[FileStatusCache], PUT_LEAF_FILES_METHOD_NAME, classOf[Path],
+ JArray.newInstance(classOf[FileStatus], 0).getClass).isPresent) {
+ new BaseHoodieTableFileIndex.FileStatusCache {
+ override def get(path: StoragePath):
org.apache.hudi.common.util.Option[java.util.List[StoragePathInfo]] =
+ toJavaOption(cache.getLeafFiles(new Path(path.toUri)).map(opt =>
opt.map(
+ e => HadoopFSUtils.convertToStoragePathInfo(e)).toList.asJava
+ ))
+
+ override def put(path: StoragePath, leafFiles:
java.util.List[StoragePathInfo]): Unit =
+ cache.putLeafFiles(new Path(path.toUri), leafFiles.asScala.map(e =>
new FileStatus(
+ e.getLength, e.isDirectory, 0, e.getBlockSize,
e.getModificationTime, new Path(e.getPath.toUri))).toArray)
+
+ override def invalidate(): Unit = cache.invalidateAll()
+ }
+ } else {
+ LOG.warn("Use no-op file status cache instead because the
FileStatusCache APIs at runtime "
+ + "are different from open-source Spark")
+ new BaseHoodieTableFileIndex.NoopCache
}
}