nsivabalan commented on code in PR #18136:
URL: https://github.com/apache/hudi/pull/18136#discussion_r2849257491
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -107,8 +110,9 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
@Getter(AccessLevel.PROTECTED)
private final List<StoragePath> queryPaths;
- private final boolean shouldIncludePendingCommits;
+ protected final boolean shouldIncludePendingCommits;
private final boolean shouldValidateInstant;
+ protected final boolean useROPathFilterForListing;
Review Comment:
`useLatestBasePathFilterForListing`
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestROPathFilterOnRead:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+/**
+ * Tests for ROPathFilter optimization with advanced scenarios and edge cases.
+ */
+class TestROPathFilterAdvanced extends HoodieSparkSqlTestBase {
+
+ val RO_PATH_FILTER_OPT_KEY =
"hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter"
+
+ test("Test ROPathFilter with empty table") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | orderingFields = 'ts'
+ | )
+ """.stripMargin)
+
+ // Query empty table with ROPathFilter enabled
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ val result = spark.sql(s"select * from $tableName").collect()
+ assert(result.length == 0)
+ }
+ }
+ }
+
+ test("Test ROPathFilter with partition pruning") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | orderingFields = 'ts'
+ | )
+ | partitioned by (dt)
+ """.stripMargin)
+
+ // Query empty table with ROPathFilter enabled
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ val result = spark.sql(s"select * from $tableName").collect()
+ assert(result.length == 0)
+ }
+
+ // Insert data across multiple partitions
+ spark.sql(s"""insert into $tableName values(1, "a1", 10.0, 1000,
"2024-01-01")""")
+ spark.sql(s"""insert into $tableName values(2, "a2", 20.0, 2000,
"2024-01-02")""")
+
+ // Update data in first partition
+ spark.sql(s"update $tableName set price = 15.0 where id = 1")
+
+ // Query single partition with ROPathFilter
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where dt
= '2024-01-01'")(
+ Seq(1, "a1", 15.0, 1000, "2024-01-01")
+ )
+ }
+ }
+ }
+
+ test("Test ROPathFilter with concurrent inserts to different partitions") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | region string
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | orderingFields = 'ts'
+ | )
+ | partitioned by (region)
+ """.stripMargin)
+
+ // Insert data to different partitions
+ spark.sql(s"""insert into $tableName values(1, "a1", 10.0, 1000,
"US")""")
+ spark.sql(s"""insert into $tableName values(2, "a2", 20.0, 2000,
"EU")""")
+ spark.sql(s"""insert into $tableName values(3, "a3", 30.0, 3000,
"APAC")""")
+ spark.sql(s"""insert into $tableName values(4, "a4", 40.0, 4000,
"US")""")
+
+ // Query all data with ROPathFilter
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ checkAnswer(s"select id, name, price, ts, region from $tableName order
by id")(
+ Seq(1, "a1", 10.0, 1000, "US"),
+ Seq(2, "a2", 20.0, 2000, "EU"),
+ Seq(3, "a3", 30.0, 3000, "APAC"),
+ Seq(4, "a4", 40.0, 4000, "US")
+ )
+ }
+ }
+ }
+
+ test("Test ROPathFilter with multiple deletes and updates") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | orderingFields = 'ts'
+ | )
+ """.stripMargin)
+
+ // Insert initial data
+ for (i <- 1 to 10) {
+ spark.sql(s"""insert into $tableName values($i, "name$i", ${i * 10.0},
${i * 1000})""")
+ }
+
+ // Perform mix of updates and deletes
+ spark.sql(s"update $tableName set price = price * 2 where id % 2 = 0")
+ spark.sql(s"delete from $tableName where id % 3 = 0")
+
+ // Query with ROPathFilter
+ withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+ val result = spark.sql(s"select id, name, price, ts from $tableName
order by id").collect()
+ // Should have deleted records where id % 3 = 0 (3, 6, 9)
+ // Should have doubled price for even ids (2, 4, 8, 10)
+ assert(result.length == 7) // 10 - 3 deleted = 7
Review Comment:
@suryaprasanna : did you get a chance to address this?
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -267,14 +275,46 @@ private Map<PartitionPath, List<FileSlice>>
loadFileSlicesForPartitions(List<Par
validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
}
- List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
HoodieTimeline activeTimeline = getActiveTimeline();
Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
+ validate(activeTimeline, queryInstant);
- try (HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
- Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
- validate(activeTimeline, queryInstant);
+ HoodieTimer timer = HoodieTimer.start();
+ List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions,
activeTimeline);
+ log.info("On {} with query instant as {}, it took {}ms to list all files
{} Hudi partitions",
+ metaClient.getTableConfig().getTableName(), queryInstant.map(instant
-> instant).orElse("N/A"),
Review Comment:
wasn't the suggestion to go w/
`queryInstant.orElse("N/A")`
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -267,14 +276,69 @@ private Map<PartitionPath, List<FileSlice>>
loadFileSlicesForPartitions(List<Par
validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
}
- List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
HoodieTimeline activeTimeline = getActiveTimeline();
Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
+ validate(activeTimeline, queryInstant);
- try (HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
- Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
- validate(activeTimeline, queryInstant);
+ HoodieTimer timer = HoodieTimer.start();
+ List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions,
activeTimeline);
+ log.info("On {} with query instant as {}, it took {}ms to list all files
{} Hudi partitions",
+ metaClient.getTableConfig().getTableName(), queryInstant.map(instant
-> instant).orElse("N/A"),
+ timer.endTimer(), partitions.size());
+
+ // ROPathFilter optimization is only applicable for COW tables with
snapshot queries
+ // For MOR tables, we need log files which are not returned by
HoodieROTablePathFilter
+ if (useROPathFilterForListing
+ && !shouldIncludePendingCommits
+ && metaClient.getTableConfig().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
Review Comment:
for MOR, if query type is RO, we could also afford to add the filtering
right?
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -267,14 +275,46 @@ private Map<PartitionPath, List<FileSlice>>
loadFileSlicesForPartitions(List<Par
validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
}
- List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
HoodieTimeline activeTimeline = getActiveTimeline();
Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
+ validate(activeTimeline, queryInstant);
- try (HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
- Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
- validate(activeTimeline, queryInstant);
+ HoodieTimer timer = HoodieTimer.start();
+ List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions,
activeTimeline);
+ log.info("On {} with query instant as {}, it took {}ms to list all files
{} Hudi partitions",
+ metaClient.getTableConfig().getTableName(), queryInstant.map(instant
-> instant).orElse("N/A"),
+ timer.endTimer(), partitions.size());
+
+ if (useROPathFilterForListing && !shouldIncludePendingCommits) {
+ // Group files by partition path, then by file group ID
+ Map<String, PartitionPath> partitionsMap = new HashMap<>();
+ partitions.forEach(p -> partitionsMap.put(p.path, p));
+ Map<PartitionPath, List<FileSlice>> partitionToFileSlices = new
HashMap<>();
+
+ for (StoragePathInfo pathInfo : allFiles) {
+ // Create FileSlice obj from StoragePathInfo.
+ String partitionPathStr = pathInfo.getPath().getParent().toString();
+ String relPartitionPath = FSUtils.getRelativePartitionPath(basePath,
pathInfo.getPath().getParent());
+ HoodieBaseFile baseFile = new HoodieBaseFile(pathInfo);
+ FileSlice fileSlice = new FileSlice(partitionPathStr,
baseFile.getCommitTime(), baseFile.getFileId());
+ fileSlice.setBaseFile(baseFile);
+
+ // Add the FileSlice to partitionToFileSlices
+ PartitionPath partitionPathObj = partitionsMap.get(relPartitionPath);
+ List<FileSlice> fileSlices =
partitionToFileSlices.computeIfAbsent(partitionPathObj, k -> new ArrayList<>());
+ fileSlices.add(fileSlice);
Review Comment:
Yes, I see that, but building FSV here is not invoking distributed spark
context and there is no metadata table involved (NoOpTableMetadata is used)).
its happening in driver and we have just 1 file slice per file group. So,
constructing should not add much overhead.
On the plus side, we dont' need to maintain additional custom code for, when
path filter is enabled.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -527,6 +527,21 @@ object HoodieFileIndex extends Logging {
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
listingModeOverride)
}
+ var pathFilterOptimizedListingEnabled = getConfigValue(options, sqlConf,
+
DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
null)
+ if (pathFilterOptimizedListingEnabled != null) {
+
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
+ pathFilterOptimizedListingEnabled)
+ } else {
+ // Also allow passing in the path filter config via Spark session conf
for convenience
+ pathFilterOptimizedListingEnabled = getConfigValue(options, sqlConf,
+ "spark." +
DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
null)
Review Comment:
shouldn't your other patch fix this already. we should fix all config key
prefix stripping at some root level and not litter across the code base.
https://github.com/apache/hudi/pull/18205
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTableStoragePathFilter.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathFilter;
+
+public class HoodieROTableStoragePathFilter implements StoragePathFilter {
Review Comment:
how about `HoodieLatestBaseFilePathFilter`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]