This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5fb1b34a6c38 fix: Fail metadata bootstrap early in presence of 0 byte
file (#18209)
5fb1b34a6c38 is described below
commit 5fb1b34a6c389ec1a4a3b332b22eb94851676da0
Author: Surya Prasanna <[email protected]>
AuthorDate: Wed Feb 25 13:53:16 2026 -0800
fix: Fail metadata bootstrap early in presence of 0 byte file (#18209)
---
.../hudi/metadata/HoodieMetadataPayload.java | 8 ++-
.../TestCopyOnWriteRollbackActionExecutor.java | 3 +-
.../hudi/functional/TestRecordLevelIndex.scala | 79 ++++++++++++++++++++--
3 files changed, 82 insertions(+), 8 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 651fa61eedc5..ac509fce962e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -317,7 +317,13 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
int size = filesAdded.size() + filesDeleted.size();
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(size, 1);
- filesAdded.forEach((fileName, fileSize) -> fileInfo.put(fileName, new
HoodieMetadataFileInfo(fileSize, false)));
+ filesAdded.forEach((fileName, fileSize) -> {
+ // Assert that the file-size of the file being added is positive, since
Hudi
+ // should not be creating empty files
+ checkState(fileSize > 0, "File name " + fileName
+ + ", is a 0 byte file. It does not have any contents");
+ fileInfo.put(fileName, new HoodieMetadataFileInfo(fileSize, false));
+ });
filesDeleted.forEach(fileName -> fileInfo.put(fileName,
DELETE_FILE_METADATA));
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index 4a890bc2e1b7..343e406ddacd 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -380,7 +380,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
// we are using test table infra. So, col stats are not populated.
HoodieTable table =
this.getHoodieTable(metaClient,
getConfigBuilder().withRollbackUsingMarkers(true).withRollbackBackupEnabled(true)
-
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false)
+ .withMetadataIndexColumnStats(false).build())
.build());
HoodieInstant needRollBackInstant = HoodieTestUtils.getCompleteInstant(
metaClient.getStorage(), metaClient.getTimelinePath(),
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 367312fd6735..cf63ea915630 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -32,21 +32,22 @@ import
org.apache.hudi.common.testutils.{HoodieTestDataGenerator, InProcessTimeG
import
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
+import org.apache.hudi.exception.{HoodieException, HoodieMetadataException}
import
org.apache.hudi.functional.TestRecordLevelIndex.TestPartitionedRecordLevelIndexTestCase
import org.apache.hudi.index.HoodieIndex.IndexType.RECORD_LEVEL_INDEX
import org.apache.hudi.index.record.HoodieRecordIndex
-import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieTableMetadataUtil, MetadataPartitionType}
-import org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
import
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.functions.lit
import org.junit.jupiter.api.{Tag, Test}
-import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals,
assertFalse, assertTrue, fail}
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals,
assertFalse, assertThrows, assertTrue, fail}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource,
ValueSource}
+import java.io.{PrintWriter, StringWriter}
import java.util
import java.util.stream.Collectors
@@ -105,7 +106,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
with SparkDatasetMix
assertEquals(recordKeys.size(),
getRecordIndexEntries(metadataBeforeRebootstrap, recordKeys,
localDataGen.getPartitionPaths.toSeq).size,
"Record index entries should match inserted records after first batch")
- assertTrue(storage.exists(new
StoragePath(getMetadataTableBasePath(basePath))),
+ assertTrue(storage.exists(new
StoragePath(HoodieTableMetadata.getMetadataTableBasePath(basePath))),
"Metadata table should exist before deletion")
// Remove _hoodie_partition_metadata from one data partition.
@@ -114,7 +115,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
with SparkDatasetMix
// Delete metadata table and force a full metadata rebootstrap.
metaClient = HoodieTableMetaClient.reload(metaClient)
HoodieTableMetadataUtil.deleteMetadataTable(metaClient, context, false)
- assertFalse(storage.exists(new
StoragePath(getMetadataTableBasePath(basePath))),
+ assertFalse(storage.exists(new
StoragePath(HoodieTableMetadata.getMetadataTableBasePath(basePath))),
"Metadata table should be removed before rebootstrap")
// Rebootstrap should succeed even when one partition metadata file is
missing.
@@ -123,7 +124,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
with SparkDatasetMix
val metadataAfterRebootstrap =
metadataWriter(writeConfig).getTableMetadata.asInstanceOf[HoodieBackedTableMetadata]
// Verify the record_index partition is created after rebootstrap.
- val recordIndexPath = new StoragePath(getMetadataTableBasePath(basePath),
MetadataPartitionType.RECORD_INDEX.getPartitionPath)
+ val recordIndexPath = new
StoragePath(HoodieTableMetadata.getMetadataTableBasePath(basePath),
MetadataPartitionType.RECORD_INDEX.getPartitionPath)
assertTrue(storage.exists(recordIndexPath),
"Record index partition should exist after metadata rebootstrap")
@@ -609,6 +610,72 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase with SparkDatasetMix
validateDataAndRecordIndices(hudiOpts,
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(deletedRecords).asScala.toSeq,
1)))
deleteDf.unpersist()
}
+
+ @Test
+ def testRecordIndexRebootstrapWithZeroByteBaseFile(): Unit = {
+ val insertedRecords = 30
+ val localDataGen = new HoodieTestDataGenerator()
+ val inserts = localDataGen.generateInserts("001", insertedRecords)
+ val insertDf = toDataset(spark, inserts)
+ val optionsWithoutRecordIndex = Map(HoodieWriteConfig.TBL_NAME.key ->
"hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key ->
HoodieTableType.COPY_ON_WRITE.name(),
+ RECORDKEY_FIELD.key -> "_row_key",
+ PARTITIONPATH_FIELD.key -> "partition_path",
+ HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
+ HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key() ->
"false",
+ HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "false",
+ HoodieCompactionConfig.INLINE_COMPACT.key() -> "false")
+
+ // Create first commit with record_index disabled.
+ insertDf.write.format("hudi")
+ .options(optionsWithoutRecordIndex)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ assertEquals(insertedRecords,
spark.read.format("hudi").load(basePath).count())
+
+ // Corrupt one base parquet file by replacing it with an empty file.
+ val corruptedBaseFileName =
replaceOneBaseFileWithEmpty(localDataGen.getPartitionPaths.toSeq)
+
+ // Delete metadata table to force rebootstrap.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ HoodieTableMetadataUtil.deleteMetadataTable(metaClient, context, false)
+ assertFalse(storage.exists(new
StoragePath(HoodieTableMetadata.getMetadataTableBasePath(basePath))),
+ "Metadata table should be removed before rebootstrap")
+
+ // Rebootstrap metadata with record_index enabled should still succeed.
+ metaClient.reloadActiveTimeline()
+ val latestSchema = new
TableSchemaResolver(metaClient).getTableSchemaFromLatestCommit(false).get().toString
+ val optionsWithRecordIndex = optionsWithoutRecordIndex ++ Map(
+ HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "true",
+ HoodieIndexConfig.INDEX_TYPE.key() -> RECORD_LEVEL_INDEX.name(),
+ HoodieWriteConfig.AVRO_SCHEMA_STRING.key() -> latestSchema)
+ val writeConfig = getWriteConfig(optionsWithRecordIndex)
+ try {
+ metadataWriter(writeConfig).getTableMetadata
+ } catch {
+ case e: HoodieMetadataException =>
+ val stackTraceWriter = new StringWriter()
+ e.printStackTrace(new PrintWriter(stackTraceWriter))
+ val stackTraceText = stackTraceWriter.toString
+ assertTrue(stackTraceText.contains(corruptedBaseFileName),
+ s"Expected HoodieMetadataException stack trace to contain corrupted
file name: $corruptedBaseFileName")
+ case t: Throwable =>
+ fail(s"Expected HoodieMetadataException but got ${t.getClass.getName}:
${t.getMessage}")
+ }
+ }
+
+ private def replaceOneBaseFileWithEmpty(partitionPaths: Seq[String]): String
= {
+ val candidateBaseFile = partitionPaths.view.flatMap { partition =>
+ storage.listDirectEntries(new StoragePath(basePath, partition)).asScala
+ .map(_.getPath)
+ .find(path => path.getName.endsWith(".parquet"))
+ }.headOption.getOrElse(throw new IllegalStateException("No base file found
to replace with empty file"))
+ assertTrue(storage.deleteFile(candidateBaseFile),
+ s"Failed to delete base file $candidateBaseFile")
+ assertTrue(storage.createNewFile(candidateBaseFile),
+ s"Failed to create empty replacement file $candidateBaseFile")
+ candidateBaseFile.getName
+ }
}
object TestRecordLevelIndex {