This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 181af01a1a25 perf: Adding support for LatestBaseFilesPathFilter to
Spark File Index (#18136)
181af01a1a25 is described below
commit 181af01a1a25855af68798b50d1260d26fea4734
Author: Surya Prasanna <[email protected]>
AuthorDate: Thu Feb 26 08:46:47 2026 -0800
perf: Adding support for LatestBaseFilesPathFilter to Spark File Index
(#18136)
This PR adds an opt-in path filtering mechanism during file listing to
prevent Spark driver OOM errors when querying large Hudi datasets with multiple
file versions per partition.
Problem: When file listing is performed without filtering, all file
versions (including older ones) are loaded into driver memory, causing OOM on
large tables.
Solution: Added a new config
hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter
(default: false) that enables HoodieROPathFilter during file listing to exclude
older file versions.
Summary and Changelog
Users can now enable path filtering during file listing to avoid loading
multiple file versions into memory on the driver. This is controlled by the new
config
hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter.
Changes:
New Config: FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER (default:
false)
Enables path filtering during file listing to reduce driver memory pressure
Filters out older file versions, keeping only the latest files needed for
queries
API Extensions:
Extended HoodieTableMetadata, BaseTableMetadata, and
FileSystemBackedTableMetadata to accept optional StoragePathFilter parameter
Added FSUtils.getAllDataFilesInPartition overload with path filter support
Created HoodieROTableStoragePathFilter wrapper to adapt Hadoop PathFilter
to Hudi's StoragePathFilter interface
Spark Integration:
Updated BaseHoodieTableFileIndex to use path filter when enabled
Modified SparkHoodieTableFileIndex to apply HoodieROTablePathFilter during
partition listing
---
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 108 +++++--
.../java/org/apache/hudi/common/fs/FSUtils.java | 10 +-
.../hudi/common/table/view/NoOpTableMetadata.java | 6 +
.../apache/hudi/metadata/BaseTableMetadata.java | 4 +-
.../metadata/FileSystemBackedTableMetadata.java | 6 +-
.../apache/hudi/metadata/HoodieTableMetadata.java | 10 +-
.../index/TestBaseHoodieTableFileIndex.java | 2 +-
.../hudi/hadoop/HiveHoodieTableFileIndex.java | 1 +
.../hadoop/HoodieLatestBaseFilesPathFilter.java | 45 +++
.../hudi/hadoop/HoodieROTablePathFilter.java | 15 +-
.../hudi/hadoop/TestHoodieROTablePathFilter.java | 16 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 12 +
.../scala/org/apache/hudi/HoodieFileIndex.scala | 15 +
.../apache/hudi/SparkHoodieTableFileIndex.scala | 22 +-
.../sql/hudi/common/TestROPathFilterOnRead.scala | 352 +++++++++++++++++++++
15 files changed, 584 insertions(+), 40 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 71a05ebd2e0e..b1e2ff8624da 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -20,6 +20,8 @@ package org.apache.hudi;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.storage.StoragePathFilter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
@@ -27,6 +29,7 @@ import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableQueryType;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.serialization.HoodieFileSliceSerializer;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
@@ -107,8 +110,9 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
@Getter(AccessLevel.PROTECTED)
private final List<StoragePath> queryPaths;
- private final boolean shouldIncludePendingCommits;
+ protected final boolean shouldIncludePendingCommits;
private final boolean shouldValidateInstant;
+ protected final boolean useLatestBaseFilesPathFilterForListing;
// The `shouldListLazily` variable controls how we initialize/refresh the
TableFileIndex:
// - non-lazy/eager listing (shouldListLazily=false): all partitions and
file slices will be loaded eagerly during initialization.
@@ -138,17 +142,18 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
private transient HoodieTableMetadata tableMetadata = null;
/**
- * @param engineContext Hudi engine-specific context
- * @param metaClient Hudi table's meta-client
- * @param configProperties unifying configuration (in the form
of generic properties)
- * @param queryType target query type
- * @param queryPaths target DFS paths being queried
- * @param specifiedQueryInstant instant as of which table is being
queried
- * @param shouldIncludePendingCommits flags whether file-index should
exclude any pending operations
- * @param shouldValidateInstant flags to validate whether query
instant is present in the timeline
- * @param fileStatusCache transient cache of fetched
[[FileStatus]]es
- * @param incrementalQueryStartTime start completion time for
incremental query (optional)
- * @param incrementalQueryEndTime end completion time for
incremental query (optional)
+ * @param engineContext Hudi engine-specific
context
+ * @param metaClient Hudi table's
meta-client
+ * @param configProperties unifying configuration
(in the form of generic properties)
+ * @param queryType target query type
+ * @param queryPaths target DFS paths being
queried
+ * @param useLatestBaseFilesPathFilterForListing memory optimization on
the driver while fetching read optimized results
+ * @param specifiedQueryInstant instant as of which
table is being queried
+ * @param shouldIncludePendingCommits flags whether
file-index should exclude any pending operations
+ * @param shouldValidateInstant flags to validate
whether query instant is present in the timeline
+ * @param fileStatusCache transient cache of
fetched [[FileStatus]]es
+ * @param incrementalQueryStartTime start completion time
for incremental query (optional)
+ * @param incrementalQueryEndTime end completion time
for incremental query (optional)
*/
public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
HoodieTableMetaClient metaClient,
@@ -156,6 +161,7 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
HoodieTableQueryType queryType,
List<StoragePath> queryPaths,
Option<String> specifiedQueryInstant,
+ boolean
useLatestBaseFilesPathFilterForListing,
boolean shouldIncludePendingCommits,
boolean shouldValidateInstant,
FileStatusCache fileStatusCache,
@@ -165,14 +171,17 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
this.partitionColumns = metaClient.getTableConfig().getPartitionFields()
.orElseGet(() -> new String[0]);
+ // Disable metadata when ro_path_filter is enabled.
this.metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(configProperties)
.enable(configProperties.getBoolean(ENABLE.key(),
DEFAULT_METADATA_ENABLE_FOR_READERS)
- && HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient))
+ && HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient)
+ && !useLatestBaseFilesPathFilterForListing)
.build();
this.queryType = queryType;
this.queryPaths = queryPaths;
+ this.useLatestBaseFilesPathFilterForListing =
useLatestBaseFilesPathFilterForListing;
this.specifiedQueryInstant = specifiedQueryInstant;
this.shouldIncludePendingCommits = shouldIncludePendingCommits;
this.shouldValidateInstant = shouldValidateInstant;
@@ -267,14 +276,70 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
}
- List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
HoodieTimeline activeTimeline = getActiveTimeline();
Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
+ validate(activeTimeline, queryInstant);
- try (HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
- Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
- validate(activeTimeline, queryInstant);
+ HoodieTimer timer = HoodieTimer.start();
+ List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions,
activeTimeline);
+ log.info("On {} with query instant as {}, it took {}ms to list all files
{} Hudi partitions",
+ metaClient.getTableConfig().getTableName(), queryInstant.orElse("N/A"),
+ timer.endTimer(), partitions.size());
+
+ // ROPathFilter optimization is only applicable for COW tables with
snapshot queries
+ // For MOR tables with READ_OPTIMIZED queries, we also only need base files
+ if (useLatestBaseFilesPathFilterForListing
+ && !shouldIncludePendingCommits
+ && (metaClient.getTableConfig().getTableType() ==
HoodieTableType.COPY_ON_WRITE
+ || queryType == HoodieTableQueryType.READ_OPTIMIZED)) {
+ return generatePartitionFileSlicesPostROTablePathFilter(partitions,
allFiles);
+ }
+ return filterFiles(partitions, activeTimeline, allFiles, queryInstant);
+ }
+
+ /**
+ * Generates FileSlices from the filtered files returned by ROPathFilter.
+ * This is a fast path that avoids constructing a full
HoodieTableFileSystemView.
+ * Only applicable for COW tables since ROPathFilter only returns base files.
+ *
+ * @param partitions List of partitions to process
+ * @param allFiles Files already filtered by ROPathFilter
+ * @return Map of PartitionPath to list of FileSlices
+ */
+ private Map<PartitionPath, List<FileSlice>>
generatePartitionFileSlicesPostROTablePathFilter(
+ List<PartitionPath> partitions, List<StoragePathInfo> allFiles) {
+ // Group files by partition path, then by file group ID
+ Map<String, PartitionPath> partitionsMap = new HashMap<>();
+ partitions.forEach(p -> partitionsMap.put(p.path, p));
+ Map<PartitionPath, List<FileSlice>> partitionToFileSlices = new
HashMap<>();
+
+ for (StoragePathInfo pathInfo : allFiles) {
+ // Create FileSlice obj from StoragePathInfo.
+ String relPartitionPath = FSUtils.getRelativePartitionPath(basePath,
pathInfo.getPath().getParent());
+ HoodieBaseFile baseFile = new HoodieBaseFile(pathInfo);
+ // Use relative partition path for FileSlice - consistent with
HoodieTableFileSystemView
+ FileSlice fileSlice = new FileSlice(relPartitionPath,
baseFile.getCommitTime(), baseFile.getFileId());
+ fileSlice.setBaseFile(baseFile);
+
+ // Add the FileSlice to partitionToFileSlices
+ PartitionPath partitionPathObj = partitionsMap.get(relPartitionPath);
+ if (partitionPathObj != null) {
+ List<FileSlice> fileSlices =
partitionToFileSlices.computeIfAbsent(partitionPathObj, k -> new ArrayList<>());
+ fileSlices.add(fileSlice);
+ } else {
+ log.warn("Could not find partition path object for relative path: {}.
Skipping file: {}",
+ relPartitionPath, pathInfo.getPath());
+ }
+ }
+ return partitionToFileSlices;
+ }
+ private Map<PartitionPath, List<FileSlice>> filterFiles(List<PartitionPath>
partitions,
+
HoodieTimeline activeTimeline,
+
List<StoragePathInfo> allFiles,
+
Option<String> queryInstant) {
+ try (HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
// NOTE: For MOR table, when the compaction is inflight, we need to not
only fetch the
// latest slices, but also include the base and log files of the
second-last version of
// the file slice in the same file group as the latest file slice that
is under compaction.
@@ -391,7 +456,8 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
/**
* Load partition paths and it's files under the query table path.
*/
- private List<StoragePathInfo> listPartitionPathFiles(List<PartitionPath>
partitions) {
+ private List<StoragePathInfo> listPartitionPathFiles(List<PartitionPath>
partitions,
+ HoodieTimeline
activeTimeline) {
List<StoragePath> partitionPaths = partitions.stream()
// NOTE: We're using [[createPathUnsafe]] to create Hadoop's [[Path]]
objects
// instances more efficiently, provided that
@@ -420,7 +486,7 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
try {
Map<String, List<StoragePathInfo>> fetchedPartitionsMap =
-
tableMetadata.getAllFilesInPartitions(missingPartitionPathsMap.keySet());
+
tableMetadata.getAllFilesInPartitions(missingPartitionPathsMap.keySet(),
getPartitionPathFilter(activeTimeline));
// Ingest newly fetched partitions into cache
fetchedPartitionsMap.forEach((absolutePath, files) -> {
@@ -440,6 +506,10 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
}
}
+ protected Option<StoragePathFilter> getPartitionPathFilter(HoodieTimeline
activeTimeline) {
+ return Option.empty();
+ }
+
private void doRefresh() {
HoodieTimer timer = HoodieTimer.start();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index d1b584eca484..56c0cc3a5b46 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -481,6 +481,13 @@ public class FSUtils {
public static List<StoragePathInfo> getAllDataFilesInPartition(HoodieStorage
storage,
StoragePath
partitionPath)
throws IOException {
+ return getAllDataFilesInPartitionByPathFilter(storage, partitionPath,
Option.empty());
+ }
+
+ public static List<StoragePathInfo>
getAllDataFilesInPartitionByPathFilter(HoodieStorage storage,
+
StoragePath partitionPath,
+
Option<StoragePathFilter> pathFilterOption)
+ throws IOException {
final Set<String> validFileExtensions =
Arrays.stream(HoodieFileFormat.values())
.map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
final String logFileExtension =
HoodieFileFormat.HOODIE_LOG.getFileExtension();
@@ -488,7 +495,8 @@ public class FSUtils {
try {
return storage.listDirectEntries(partitionPath, path -> {
String extension = FSUtils.getFileExtension(path.getName());
- return validFileExtensions.contains(extension) ||
path.getName().contains(logFileExtension);
+ return (validFileExtensions.contains(extension) ||
path.getName().contains(logFileExtension))
+ && pathFilterOption.map(filter ->
filter.accept(path)).orElse(true);
}).stream().filter(StoragePathInfo::isFile).collect(Collectors.toList());
} catch (FileNotFoundException ex) {
// return empty FileStatus if partition does not exist already
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
index 0522e9ba50af..3425ca829e0f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
@@ -36,6 +36,7 @@ import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.RawKey;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathFilter;
import org.apache.hudi.storage.StoragePathInfo;
import java.io.IOException;
@@ -75,6 +76,11 @@ class NoOpTableMetadata implements HoodieTableMetadata {
throw new HoodieMetadataException("Unsupported operation:
getAllFilesInPartitions!");
}
+ @Override
+ public Map<String, List<StoragePathInfo>>
getAllFilesInPartitions(Collection<String> partitionPaths,
Option<StoragePathFilter> pathFilterOption) throws IOException {
+ throw new HoodieMetadataException("Unsupported operation:
getAllFilesInPartitions!");
+ }
+
@Override
public Option<BloomFilter> getBloomFilter(String partitionName, String
fileName) throws HoodieMetadataException {
throw new HoodieMetadataException("Unsupported operation:
getBloomFilter!");
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index a567a8053c06..f59a257dd777 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -43,6 +43,7 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.storage.StoragePathFilter;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -146,7 +147,8 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
}
@Override
- public Map<String, List<StoragePathInfo>>
getAllFilesInPartitions(Collection<String> partitions)
+ public Map<String, List<StoragePathInfo>>
getAllFilesInPartitions(Collection<String> partitions,
+
Option<StoragePathFilter> unused)
throws IOException {
ValidationUtils.checkArgument(isMetadataTableInitialized);
if (partitions.isEmpty()) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 5b1878ccd53e..4f6b0e439efa 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -42,6 +42,7 @@ import org.apache.hudi.expression.Predicates;
import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathFilter;
import org.apache.hudi.storage.StoragePathInfo;
import java.io.FileNotFoundException;
@@ -243,7 +244,8 @@ public class FileSystemBackedTableMetadata extends
AbstractHoodieTableMetadata {
}
@Override
- public Map<String, List<StoragePathInfo>>
getAllFilesInPartitions(Collection<String> partitionPaths)
+ public Map<String, List<StoragePathInfo>>
getAllFilesInPartitions(Collection<String> partitionPaths,
+
Option<StoragePathFilter> pathFilterOption)
throws IOException {
if (partitionPaths == null || partitionPaths.isEmpty()) {
return Collections.emptyMap();
@@ -260,7 +262,7 @@ public class FileSystemBackedTableMetadata extends
AbstractHoodieTableMetadata {
partitionPathStr -> {
StoragePath partitionPath = new StoragePath(partitionPathStr);
return Pair.of(partitionPathStr,
- FSUtils.getAllDataFilesInPartition(getStorage(),
partitionPath));
+ FSUtils.getAllDataFilesInPartitionByPathFilter(getStorage(),
partitionPath, pathFilterOption));
}, parallelism);
engineContext.clearJobStatus();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 779965e808b4..1da1f77400e6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -32,6 +32,7 @@ import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.expression.Expression;
import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathFilter;
import org.apache.hudi.storage.StoragePathInfo;
import org.slf4j.Logger;
@@ -153,8 +154,13 @@ public interface HoodieTableMetadata extends Serializable,
AutoCloseable {
*
* NOTE: Absolute partition paths are expected here
*/
- Map<String, List<StoragePathInfo>>
getAllFilesInPartitions(Collection<String> partitionPaths)
- throws IOException;
+ default Map<String, List<StoragePathInfo>>
getAllFilesInPartitions(Collection<String> partitionPaths)
+ throws IOException {
+ return getAllFilesInPartitions(partitionPaths, Option.empty());
+ }
+
+ Map<String, List<StoragePathInfo>>
getAllFilesInPartitions(Collection<String> partitionPaths,
+
Option<StoragePathFilter> pathFilterOption) throws IOException;
/**
* Get the bloom filter for the FileID from the metadata table.
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBaseHoodieTableFileIndex.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBaseHoodieTableFileIndex.java
index a596d48bb54e..5ad1a207b39c 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBaseHoodieTableFileIndex.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBaseHoodieTableFileIndex.java
@@ -74,7 +74,7 @@ class TestBaseHoodieTableFileIndex extends
HoodieCommonTestHarness {
public TestLocalIndex(HoodieEngineContext engineContext,
HoodieTableMetaClient metaClient, TypedProperties configProperties,
HoodieTableQueryType queryType,
List<StoragePath> queryPaths, Option<String>
specifiedQueryInstant, boolean shouldIncludePendingCommits, boolean
shouldValidateInstant,
FileStatusCache fileStatusCache, boolean
shouldListLazily, Option<String> startCompletionTime, Option<String>
endCompletionTime) {
- super(engineContext, metaClient, configProperties, queryType,
queryPaths, specifiedQueryInstant, shouldIncludePendingCommits,
shouldValidateInstant, fileStatusCache, shouldListLazily,
+ super(engineContext, metaClient, configProperties, queryType,
queryPaths, specifiedQueryInstant, false, shouldIncludePendingCommits,
shouldValidateInstant, fileStatusCache, shouldListLazily,
startCompletionTime, endCompletionTime);
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
index a77860e5892d..3502cc5345a8 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
@@ -55,6 +55,7 @@ public class HiveHoodieTableFileIndex extends
BaseHoodieTableFileIndex {
queryType,
queryPaths,
specifiedQueryInstant,
+ false,
shouldIncludePendingCommits,
true,
new NoopCache(),
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLatestBaseFilesPathFilter.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLatestBaseFilesPathFilter.java
new file mode 100644
index 000000000000..362ac8c53677
--- /dev/null
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLatestBaseFilesPathFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathFilter;
+
+public class HoodieLatestBaseFilesPathFilter implements StoragePathFilter {
+
+ private HoodieROTablePathFilter roTablePathFilter;
+
+ public HoodieLatestBaseFilesPathFilter(HoodieROTablePathFilter
roTablePathFilter) {
+ this.roTablePathFilter = roTablePathFilter;
+ }
+
+ public HoodieLatestBaseFilesPathFilter(StorageConfiguration conf,
+ HoodieTableMetaClient metaClient,
+ HoodieTimeline completedTimeline) {
+ roTablePathFilter = new HoodieROTablePathFilter(conf, metaClient,
completedTimeline);
+ }
+
+ @Override
+ public boolean accept(StoragePath path) {
+ return roTablePathFilter.accept(path);
+ }
+}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index 99a8c7f6710b..23ec057cf18c 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
@@ -99,17 +100,17 @@ public class HoodieROTablePathFilter implements
Configurable, PathFilter, Serial
private transient HoodieLocalEngineContext engineContext;
-
private transient HoodieStorage storage;
public HoodieROTablePathFilter() {
- this(new Configuration());
+ this(HadoopFSUtils.getStorageConf());
}
- public HoodieROTablePathFilter(Configuration conf) {
+ @VisibleForTesting
+ public HoodieROTablePathFilter(StorageConfiguration storageConf) {
this.hoodiePathCache = new ConcurrentHashMap<>();
this.nonHoodiePathCache = new HashSet<>();
- this.conf = HadoopFSUtils.getStorageConfWithCopy(conf);
+ this.conf = storageConf;
this.metaClientCache = new HashMap<>();
this.completedTimelineCache = new HashMap<>();
}
@@ -117,7 +118,7 @@ public class HoodieROTablePathFilter implements
Configurable, PathFilter, Serial
/**
* By passing metaClient and completedTimeline, we can sync the view seen
from this class against HoodieFileIndex class
*/
- public HoodieROTablePathFilter(Configuration conf,
+ public HoodieROTablePathFilter(StorageConfiguration conf,
HoodieTableMetaClient metaClient,
HoodieTimeline completedTimeline) {
this(conf);
@@ -138,6 +139,10 @@ public class HoodieROTablePathFilter implements
Configurable, PathFilter, Serial
return null;
}
+ public boolean accept(StoragePath path) {
+ return accept(new Path(path.toString()));
+ }
+
@Override
public boolean accept(Path path) {
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieROTablePathFilter.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieROTablePathFilter.java
index 545182ed8c59..e7d25a8b6bd5 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieROTablePathFilter.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieROTablePathFilter.java
@@ -25,10 +25,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestTable;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieTimeTravelException;
+import org.apache.hudi.storage.StorageConfiguration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -51,7 +51,7 @@ public class TestHoodieROTablePathFilter extends
HoodieCommonTestHarness {
@BeforeEach
public void setUp() throws Exception {
initMetaClient();
- pathFilter = new
HoodieROTablePathFilter(metaClient.getStorageConf().unwrapAs(Configuration.class));
+ pathFilter = new HoodieROTablePathFilter(metaClient.getStorageConf());
testTable = HoodieTestTable.of(metaClient);
}
@@ -159,7 +159,7 @@ public class TestHoodieROTablePathFilter extends
HoodieCommonTestHarness {
validateTimestampAsOf(metaClient, commit001); // Should not throw
// Test 3: HoodieROTablePathFilter with Configuration-only constructor
- Configuration confWithAsOf = new
Configuration(metaClient.getStorageConf().unwrapAs(Configuration.class));
+ StorageConfiguration confWithAsOf = metaClient.getStorageConf();
confWithAsOf.set(HoodieCommonConfig.TIMESTAMP_AS_OF.key(), commit003);
HoodieROTablePathFilter pathFilterWithAsOf = new
HoodieROTablePathFilter(confWithAsOf);
@@ -208,14 +208,14 @@ public class TestHoodieROTablePathFilter extends
HoodieCommonTestHarness {
// Test 1: HoodieROTablePathFilter without TIMESTAMP_AS_OF should work
(normal operation)
HoodieROTablePathFilter filterWithoutAsOf = new HoodieROTablePathFilter(
- metaClient.getStorageConf().unwrapAs(Configuration.class), metaClient,
+ metaClient.getStorageConf(), metaClient,
metaClient.getActiveTimeline().filterCompletedInstants());
assertTrue(filterWithoutAsOf.accept(file1Path), "File from commit001
should be accepted");
assertTrue(filterWithoutAsOf.accept(file3Path), "File from commit003
should be accepted");
// Test 2: HoodieROTablePathFilter with TIMESTAMP_AS_OF before inflight
should work
- Configuration confBeforeInflight = new
Configuration(metaClient.getStorageConf().unwrapAs(Configuration.class));
+ StorageConfiguration confBeforeInflight = metaClient.getStorageConf();
confBeforeInflight.set(HoodieCommonConfig.TIMESTAMP_AS_OF.key(),
commit001);
HoodieROTablePathFilter filterBeforeInflight = new
HoodieROTablePathFilter(confBeforeInflight, metaClient,
@@ -224,7 +224,7 @@ public class TestHoodieROTablePathFilter extends
HoodieCommonTestHarness {
assertTrue(filterBeforeInflight.accept(file1Path), "File from commit001
should be accepted with as.of.instant=001");
// Test 3: HoodieROTablePathFilter with TIMESTAMP_AS_OF after inflight
should fail during accept()
- Configuration confAfterInflight = new
Configuration(metaClient.getStorageConf().unwrapAs(Configuration.class));
+ StorageConfiguration confAfterInflight = metaClient.getStorageConf();
confAfterInflight.set(HoodieCommonConfig.TIMESTAMP_AS_OF.key(), commit003);
HoodieROTablePathFilter filterAfterInflight = new
HoodieROTablePathFilter(confAfterInflight, metaClient,
@@ -236,7 +236,7 @@ public class TestHoodieROTablePathFilter extends
HoodieCommonTestHarness {
}, "Calling accept() with as.of.instant=003 should fail due to inflight
commit002");
// Test 4: Configuration-only constructor with TIMESTAMP_AS_OF after
inflight should fail during accept()
- Configuration confOnlyAfterInflight = new
Configuration(metaClient.getStorageConf().unwrapAs(Configuration.class));
+ StorageConfiguration confOnlyAfterInflight = metaClient.getStorageConf();
confOnlyAfterInflight.set(HoodieCommonConfig.TIMESTAMP_AS_OF.key(),
commit003);
HoodieROTablePathFilter filterConfOnlyAfterInflight = new
HoodieROTablePathFilter(confOnlyAfterInflight, metaClient,
@@ -278,7 +278,7 @@ public class TestHoodieROTablePathFilter extends
HoodieCommonTestHarness {
// when there are inflight commits that would cause validation to fail
// Test 1: HoodieROTablePathFilter should work without as.of.instant even
when inflight commits exist
- Configuration confWithoutAsOf = new
Configuration(metaClient.getStorageConf().unwrapAs(Configuration.class));
+ StorageConfiguration confWithoutAsOf = metaClient.getStorageConf();
HoodieROTablePathFilter filterWithoutAsOf = new
HoodieROTablePathFilter(confWithoutAsOf, metaClient,
metaClient.getActiveTimeline().filterCompletedInstants());
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 8f10823cb870..7e92b47f3321 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -227,6 +227,18 @@ object DataSourceReadOptions {
" by carefully analyzing provided partition-column predicates and
deducing corresponding partition-path prefix from " +
" them (if possible).")
+ val FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER:
ConfigProperty[Boolean] =
+
ConfigProperty.key("hoodie.datasource.read.file.index.optimize.listing.using.path.filter")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("1.2.0")
+ .withDocumentation("Controls whether file listing is done using the
HoodieROTablePathFilter. " +
+ " This is mainly necessary when the metadata table is not enabled or
corrupted and the job " +
+ " is doing recursive calls to fetch the partition paths and the
dataset has multiple versions" +
+ " of the same file in the same partition and it could lead to Out of
Memory on the driver if" +
+ " the dataset is too large. Another important limitation is that this
config should not be" +
+ " used if there are bootstrap files present in the file system. NOTE:
Only works for COW tables with snapshot queries.")
+
val INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN: ConfigProperty[String] =
ConfigProperty
.key("hoodie.datasource.read.incr.fallback.fulltablescan.enable")
.defaultValue("true")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 9658fd451ec8..34a67cfb3a5b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -527,6 +527,21 @@ object HoodieFileIndex extends Logging {
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
listingModeOverride)
}
+ var pathFilterOptimizedListingEnabled = getConfigValue(options, sqlConf,
+
DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
null)
+ if (pathFilterOptimizedListingEnabled != null) {
+
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
+ pathFilterOptimizedListingEnabled)
+ } else {
+ // Also allow passing in the path filter config via Spark session conf
for convenience
+ pathFilterOptimizedListingEnabled = getConfigValue(options, sqlConf,
+ "spark." +
DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
null)
+ if (pathFilterOptimizedListingEnabled != null) {
+
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
+ pathFilterOptimizedListingEnabled)
+ }
+ }
+
if (tableConfig != null) {
properties.setProperty(RECORDKEY_FIELD.key,
tableConfig.getRecordKeyFields.orElse(Array.empty).mkString(","))
properties.setProperty(PARTITIONPATH_FIELD.key,
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig).orElse(""))
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 567053c618a7..077e1129eb5e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -22,14 +22,16 @@ import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType,
extractEqualityPredicatesLiteralValues, generateFieldMap,
haveProperPartitionValues, shouldListLazily,
shouldUsePartitionPathPrefixAnalysis, shouldValidatePartitionColumns}
import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.config.{HoodieCommonConfig, TypedProperties}
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
+import org.apache.hudi.hadoop.HoodieLatestBaseFilesPathFilter
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.internal.schema.Types.RecordType
import org.apache.hudi.internal.schema.utils.Conversions
@@ -88,6 +90,8 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
deduceQueryType(configProperties),
queryPaths.asJava,
toJavaOption(specifiedQueryInstant),
+
configProperties.getBoolean(FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
+ FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.defaultValue()),
false,
false,
SparkHoodieTableFileIndex.adapt(fileStatusCache),
@@ -439,6 +443,22 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
private def arePartitionPathsUrlEncoded: Boolean =
metaClient.getTableConfig.getUrlEncodePartitioning.toBoolean
+
+ override protected def getPartitionPathFilter(activeTimeline:
HoodieTimeline):
org.apache.hudi.common.util.Option[org.apache.hudi.storage.StoragePathFilter] =
{
+ if (useLatestBaseFilesPathFilterForListing &&
!shouldIncludePendingCommits) {
+ // Use getStorageConfWithCopy to avoid mutating the shared Spark session
config
+ val conf =
HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration)
+ if (specifiedQueryInstant.isDefined) {
+ conf.set(HoodieCommonConfig.TIMESTAMP_AS_OF.key(),
specifiedQueryInstant.get)
+ }
+ org.apache.hudi.common.util.Option.of(
+ new HoodieLatestBaseFilesPathFilter(conf, metaClient,
+ activeTimeline.filterCompletedInstantsOrRewriteTimeline()))
+ } else {
+ org.apache.hudi.common.util.Option.empty()
+ }
+ }
+
}
object SparkHoodieTableFileIndex extends SparkAdapterSupport {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestROPathFilterOnRead.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestROPathFilterOnRead.scala
new file mode 100644
index 000000000000..8327cf7a8408
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestROPathFilterOnRead.scala
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+/**
+ * Tests for ROPathFilter optimization with advanced scenarios and edge cases.
+ */
+class TestROPathFilterAdvanced extends HoodieSparkSqlTestBase {
+
+ val RO_PATH_FILTER_OPT_KEY =
"hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter"
+
+ test("Test ROPathFilter with empty table") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | orderingFields = 'ts'
+ | )
+ """.stripMargin)
+
+ // Query empty table with ROPathFilter enabled
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ val result = spark.sql(s"select * from $tableName").collect()
+ assert(result.length == 0)
+ }
+ }
+ }
+
+ test("Test ROPathFilter with partition pruning") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | orderingFields = 'ts'
+ | )
+ | partitioned by (dt)
+ """.stripMargin)
+
+ // Query empty table with ROPathFilter enabled
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ val result = spark.sql(s"select * from $tableName").collect()
+ assert(result.length == 0)
+ }
+
+ // Insert data across multiple partitions
+ spark.sql(s"""insert into $tableName values(1, "a1", 10.0, 1000,
"2024-01-01")""")
+ spark.sql(s"""insert into $tableName values(2, "a2", 20.0, 2000,
"2024-01-02")""")
+
+ // Update data in first partition
+ spark.sql(s"update $tableName set price = 15.0 where id = 1")
+
+ // Query single partition with ROPathFilter
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where dt
= '2024-01-01'")(
+ Seq(1, "a1", 15.0, 1000, "2024-01-01")
+ )
+ }
+ }
+ }
+
+ test("Test ROPathFilter with concurrent inserts to different partitions") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | region string
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | orderingFields = 'ts'
+ | )
+ | partitioned by (region)
+ """.stripMargin)
+
+ // Insert data to different partitions
+ spark.sql(s"""insert into $tableName values(1, "a1", 10.0, 1000,
"US")""")
+ spark.sql(s"""insert into $tableName values(2, "a2", 20.0, 2000,
"EU")""")
+ spark.sql(s"""insert into $tableName values(3, "a3", 30.0, 3000,
"APAC")""")
+ spark.sql(s"""insert into $tableName values(4, "a4", 40.0, 4000,
"US")""")
+
+ // Query all data with ROPathFilter
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ checkAnswer(s"select id, name, price, ts, region from $tableName order
by id")(
+ Seq(1, "a1", 10.0, 1000, "US"),
+ Seq(2, "a2", 20.0, 2000, "EU"),
+ Seq(3, "a3", 30.0, 3000, "APAC"),
+ Seq(4, "a4", 40.0, 4000, "US")
+ )
+ }
+ }
+ }
+
+ test("Test ROPathFilter with multiple deletes and updates") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | orderingFields = 'ts'
+ | )
+ """.stripMargin)
+
+ // Insert initial data
+ for (i <- 1 to 10) {
+ spark.sql(s"""insert into $tableName values($i, "name$i", ${i * 10.0},
${i * 1000})""")
+ }
+
+ // Perform mix of updates and deletes
+ spark.sql(s"update $tableName set price = price * 2 where id % 2 = 0")
+ spark.sql(s"delete from $tableName where id % 3 = 0")
+
+ // Query with ROPathFilter
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ val result = spark.sql(s"select id, name, price, ts from $tableName
order by id").collect()
+ // Should have deleted records where id % 3 = 0 (3, 6, 9)
+ // Should have doubled price for even ids (2, 4, 8, 10)
+ assert(result.length == 7) // 10 - 3 deleted = 7
+
+ // Check a few specific values
+ val row2 = result.find(_.getInt(0) == 2).get
+ assert(row2.getDouble(2) == 40.0) // doubled from 20.0
+
+ val row5 = result.find(_.getInt(0) == 5).get
+ assert(row5.getDouble(2) == 50.0) // not doubled (odd)
+ }
+ }
+ }
+
+ test("Test ROPathFilter with mixed partition and non-partition columns in
filter") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | category string
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | orderingFields = 'ts'
+ | )
+ | partitioned by (category)
+ """.stripMargin)
+
+ // Insert data
+ spark.sql(s"""insert into $tableName values(1, "a1", 10.0, 1000,
"electronics")""")
+ spark.sql(s"""insert into $tableName values(2, "a2", 20.0, 2000,
"electronics")""")
+ spark.sql(s"""insert into $tableName values(3, "a3", 30.0, 3000,
"books")""")
+ spark.sql(s"""insert into $tableName values(4, "a4", 40.0, 4000,
"books")""")
+
+ // Update some records
+ spark.sql(s"update $tableName set price = 15.0 where id = 1")
+
+ // Query with both partition and data filters
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ checkAnswer(s"select id, name, price, ts, category from $tableName
where category = 'electronics' and price > 12.0 order by id")(
+ Seq(1, "a1", 15.0, 1000, "electronics"),
+ Seq(2, "a2", 20.0, 2000, "electronics")
+ )
+ }
+ }
+ }
+
+ test("Test ROPathFilter correctness with complex update patterns") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | version int,
+ | data string,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | orderingFields = 'ts'
+ | )
+ """.stripMargin)
+
+ // Insert and update the same record multiple times
+ spark.sql(s"""insert into $tableName values(1, 1, "initial", 1000)""")
+ spark.sql(s"""update $tableName set version = 2, data = "updated_v2"
where id = 1""")
+ spark.sql(s"""update $tableName set version = 3, data = "updated_v3"
where id = 1""")
+ spark.sql(s"""update $tableName set version = 4, data = "updated_v4"
where id = 1""")
+
+ // Insert another record
+ spark.sql(s"""insert into $tableName values(2, 1, "second_record",
2000)""")
+
+ // Query with ROPathFilter should return only latest versions
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ checkAnswer(s"select id, version, data, ts from $tableName order by
id")(
+ Seq(1, 4, "updated_v4", 1000),
+ Seq(2, 1, "second_record", 2000)
+ )
+ }
+
+ // Without ROPathFilter should still return same results (correct
filtering)
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "false") {
+ checkAnswer(s"select id, version, data, ts from $tableName order by
id")(
+ Seq(1, 4, "updated_v4", 1000),
+ Seq(2, 1, "second_record", 2000)
+ )
+ }
+ }
+ }
+
+ test("Test ROPathFilter with time travel queries") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | orderingFields = 'ts'
+ | )
+ """.stripMargin)
+
+ // Insert initial data (commit 1) - use single insert to ensure all
records are in same commit
+ spark.sql(s"""insert into $tableName values
+ |(1, "v1_name1", 10.0, 1000),
+ |(2, "v1_name2", 20.0, 2000),
+ |(3, "v1_name3", 30.0, 3000)""".stripMargin)
+
+ // Get first commit timestamp
+ val commit1 = spark.sql(s"select distinct(_hoodie_commit_time) from
$tableName").collect()(0).getString(0)
+
+ // Update data (commit 2)
+ spark.sql(s"""update $tableName set name = "v2_name1", price = 15.0
where id = 1""")
+ val commit2 = spark.sql(s"select distinct(_hoodie_commit_time) from
$tableName where id = 1").collect()(0).getString(0)
+
+ // Update data again (commit 3)
+ spark.sql(s"""update $tableName set name = "v3_name2", price = 25.0
where id = 2""")
+ val commit3 = spark.sql(s"select distinct(_hoodie_commit_time) from
$tableName where id = 2").collect()(0).getString(0)
+
+ // Delete a record (commit 4)
+ spark.sql(s"""delete from $tableName where id = 3""")
+
+ // Test current state with ROPathFilter
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+ Seq(1, "v2_name1", 15.0, 1000),
+ Seq(2, "v3_name2", 25.0, 2000)
+ )
+ }
+
+ // Test time travel to commit 1 with ROPathFilter
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ checkAnswer(s"select id, name, price, ts from $tableName timestamp as
of '$commit1' order by id")(
+ Seq(1, "v1_name1", 10.0, 1000),
+ Seq(2, "v1_name2", 20.0, 2000),
+ Seq(3, "v1_name3", 30.0, 3000)
+ )
+ }
+
+ // Test time travel to commit 2 with ROPathFilter
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ checkAnswer(s"select id, name, price, ts from $tableName timestamp as
of '$commit2' order by id")(
+ Seq(1, "v2_name1", 15.0, 1000),
+ Seq(2, "v1_name2", 20.0, 2000),
+ Seq(3, "v1_name3", 30.0, 3000)
+ )
+ }
+
+ // Test time travel to commit 3 with ROPathFilter
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ checkAnswer(s"select id, name, price, ts from $tableName timestamp as
of '$commit3' order by id")(
+ Seq(1, "v2_name1", 15.0, 1000),
+ Seq(2, "v3_name2", 25.0, 2000),
+ Seq(3, "v1_name3", 30.0, 3000)
+ )
+ }
+
+ // Verify time travel returns same results with and without ROPathFilter
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "false") {
+ checkAnswer(s"select id, name, price, ts from $tableName timestamp as
of '$commit1' order by id")(
+ Seq(1, "v1_name1", 10.0, 1000),
+ Seq(2, "v1_name2", 20.0, 2000),
+ Seq(3, "v1_name3", 30.0, 3000)
+ )
+ }
+ }
+ }
+}