This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0b80239a1d61 fix: Disable column stats and partition stats indices for
Lance base files (#18588)
0b80239a1d61 is described below
commit 0b80239a1d61b7de0d1346c16560a61eef527271
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue May 19 16:30:51 2026 -0700
fix: Disable column stats and partition stats indices for Lance base files
(#18588)
---
.../hudi/metadata/MetadataPartitionType.java | 13 +++-
.../hudi/functional/TestLanceDataSource.scala | 75 +++++++++++++++-------
2 files changed, 63 insertions(+), 25 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index 7944f0018127..04bd9bdab264 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
import org.apache.hudi.avro.model.HoodieSecondaryIndexInfo;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -108,7 +109,11 @@ public enum MetadataPartitionType {
COLUMN_STATS(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS,
"col-stats-", 3) {
@Override
public boolean isMetadataPartitionEnabled(HoodieMetadataConfig
metadataConfig, HoodieTableConfig tableConfig) {
- return metadataConfig.isColumnStatsIndexEnabled();
+ // Lance base files do not yet emit column-range metadata, so per-file
column stats
+ // aggregate as empty entries and silently prune everything on read.
Disable until
+ // HoodieTableMetadataUtil#readColumnRangeMetadataFrom has a LANCE
branch.
+ return tableConfig.getBaseFileFormat() != HoodieFileFormat.LANCE
+ && metadataConfig.isColumnStatsIndexEnabled();
}
@Override
@@ -240,7 +245,11 @@ public enum MetadataPartitionType {
PARTITION_STATS(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS,
"partition-stats-", 6) {
@Override
public boolean isMetadataPartitionEnabled(HoodieMetadataConfig
metadataConfig, HoodieTableConfig tableConfig) {
- return tableConfig.isTablePartitioned() &&
metadataConfig.isPartitionStatsIndexEnabled();
+ // Partition stats aggregate per-file column ranges. Lance base files
contribute none
+ // (see HoodieTableMetadataUtil#readColumnRangeMetadataFrom), so
partition records end
+ // up with empty ranges and the partition stats index prunes everything
on read.
+ return tableConfig.getBaseFileFormat() != HoodieFileFormat.LANCE
+ && tableConfig.isTablePartitioned() &&
metadataConfig.isPartitionStatsIndexEnabled();
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
index 1cd5647d9949..093b6ee30873 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
@@ -29,6 +29,7 @@ import
org.apache.hudi.common.table.view.{FileSystemViewManager, FileSystemViewS
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.io.storage.HoodieSparkLanceReader
+import org.apache.hudi.metadata.MetadataPartitionType
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.testutils.HoodieSparkClientTestBase
@@ -88,6 +89,23 @@ class TestLanceDataSource extends HoodieSparkClientTestBase {
}
}
+ // For MOR tables, a compaction is recorded on the active timeline with
action == "commit"
+ // (vs. "deltacommit" for regular writes). Builds a fresh meta client so
callers see the
+ // current on-disk timeline state.
+ private def assertCompactionCommitPresence(tablePath: String, expectPresent:
Boolean, message: String): Unit = {
+ val compactionCommits = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .setBasePath(tablePath)
+ .build()
+ .getActiveTimeline.filterCompletedInstants().getInstants.asScala
+ .filter(instant => instant.getAction == "commit")
+ if (expectPresent) {
+ assertTrue(compactionCommits.nonEmpty, message)
+ } else {
+ assertTrue(compactionCommits.isEmpty, message)
+ }
+ }
+
@ParameterizedTest
@EnumSource(value = classOf[HoodieTableType])
def testBasicWriteAndRead(tableType: HoodieTableType): Unit = {
@@ -1149,6 +1167,7 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
// Disable small file handling so the next insert creates a new file group
// and updates in MOR generate log file(s)
spark.sql(s"alter table $tableName set tblproperties
('hoodie.merge.small.file.group.candidates.limit' = '0')")
+ spark.sql(s"alter table $tableName set tblproperties
('hoodie.compact.inline.max.delta.commits' = '6')")
// Test 3: INSERT with subset of columns (null handling)
spark.sql(s"""
@@ -1174,15 +1193,18 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
)
// Test 5: DELETE a row
- // TODO(#18558): test DELETE with MOR table type once the bug is fixed
- if (tableType == HoodieTableType.COPY_ON_WRITE) {
- spark.sql(s"delete from $tableName where id = 3")
+ spark.sql(s"delete from $tableName where id = 3")
+ checkAnswer(s"select id, name, age, score, dt from $tableName order by
id")(
+ Seq(1, "Alice", 31, 99.9, "2025-01-01"),
+ Seq(2, "Bob", 25, 87.3, "2025-01-02"),
+ Seq(4, "Diana", 40, null, "2025-01-01")
+ )
- checkAnswer(s"select id, name, age, score, dt from $tableName order by
id")(
- Seq(1, "Alice", 31, 99.9, "2025-01-01"),
- Seq(2, "Bob", 25, 87.3, "2025-01-02"),
- Seq(4, "Diana", 40, null, "2025-01-01")
- )
+ // For MOR: 5 deltacommits so far (insert x3, update, delete) — below the
+ // max.delta.commits=6 threshold, so no inline compaction should have run
yet.
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ assertCompactionCommitPresence(tablePath, expectPresent = false,
+ "No compaction commit should be present before max.delta.commits=6
threshold is reached")
}
// Test 6: INSERT with static partition (only for partitioned tables)
@@ -1192,21 +1214,18 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
values (28, 5, 'Eve')
""".stripMargin)
- if (tableType == HoodieTableType.COPY_ON_WRITE) {
- checkAnswer(s"select id, name, age, score, dt from $tableName order by
id")(
- Seq(1, "Alice", 31, 99.9, "2025-01-01"),
- Seq(2, "Bob", 25, 87.3, "2025-01-02"),
- Seq(4, "Diana", 40, null, "2025-01-01"),
- Seq(5, "Eve", 28, null, "2025-01-05")
- )
- } else {
- checkAnswer(s"select id, name, age, score, dt from $tableName order by
id")(
- Seq(1, "Alice", 31, 99.9, "2025-01-01"),
- Seq(2, "Bob", 25, 87.3, "2025-01-02"),
- Seq(3, "Charlie", 35, 92.1, "2025-01-02"),
- Seq(4, "Diana", 40, null, "2025-01-01"),
- Seq(5, "Eve", 28, null, "2025-01-05")
- )
+ checkAnswer(s"select id, name, age, score, dt from $tableName order by
id")(
+ Seq(1, "Alice", 31, 99.9, "2025-01-01"),
+ Seq(2, "Bob", 25, 87.3, "2025-01-02"),
+ Seq(4, "Diana", 40, null, "2025-01-01"),
+ Seq(5, "Eve", 28, null, "2025-01-05")
+ )
+
+ // For MOR: this is the 6th deltacommit, which should trigger inline
compaction
+ // (HoodieSparkSqlWriter auto-enables hoodie.compact.inline for MOR
batch writes).
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ assertCompactionCommitPresence(tablePath, expectPresent = true,
+ "Inline compaction commit should be present after 6th deltacommit")
}
}
@@ -1219,6 +1238,16 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
val baseFileFormat = metaClient.getTableConfig.getBaseFileFormat
assertEquals(HoodieFileFormat.LANCE, baseFileFormat,
"Table should use Lance base file format")
+
+ // Column stats and partition stats indices are gated off for Lance base
files in
+ // MetadataPartitionType — per-file column ranges aren't emitted for Lance
yet, and
+ // empty ranges would silently prune everything on read. Confirm the
metadata table
+ // never initialized these partitions.
+ val tableConfig = metaClient.getTableConfig
+
assertFalse(tableConfig.isMetadataPartitionAvailable(MetadataPartitionType.COLUMN_STATS),
+ "Column stats metadata partition must not be initialized for Lance
tables")
+
assertFalse(tableConfig.isMetadataPartitionAvailable(MetadataPartitionType.PARTITION_STATS),
+ "Partition stats metadata partition must not be initialized for Lance
tables")
}
/**