This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 81ba19813df3 [HUDI-9626] Prefetch HFiles for metadata table if size is
below a configured threshold (#13567)
81ba19813df3 is described below
commit 81ba19813df3ccea1e8b44f90dc1d4b31c0d7d2a
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Tue Jul 29 04:23:47 2025 -0700
[HUDI-9626] Prefetch HFiles for metadata table if size is below a
configured threshold (#13567)
* add a new write config;
* Add HFileReaderFactory;
* Add test for the factory class.
---------
Co-authored-by: rmahindra123 <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/common/config/HoodieMetadataConfig.java | 12 ++
.../apache/hudi/io/storage/HFileReaderFactory.java | 123 ++++++++++++
.../io/storage/HoodieNativeAvroHFileReader.java | 56 ++----
.../hudi/io/storage/TestHFileReaderFactory.java | 215 +++++++++++++++++++++
.../storage/TestHoodieNativeAvroHFileReader.java | 9 +-
.../io/hadoop/HoodieAvroFileReaderFactory.java | 11 +-
.../io/hadoop/TestHoodieHFileReaderWriter.java | 13 +-
7 files changed, 391 insertions(+), 48 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 66f89e125097..19d18157eee3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -496,6 +496,14 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.withDocumentation("Default number of partitions to use when
repartitioning is needed. "
+ "This provides a reasonable level of parallelism for metadata
table operations.");
+ public static final ConfigProperty<Integer> METADATA_FILE_CACHE_MAX_SIZE_MB
= ConfigProperty
+ .key(METADATA_PREFIX + ".file.cache.max.size.mb")
+ .defaultValue(0)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Max size in MB below which metadata file (HFile)
will be downloaded "
+ + "and cached entirely for the HFileReader.");
+
public long getMaxLogFileSize() {
return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP);
}
@@ -729,6 +737,10 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return getInt(REPARTITION_DEFAULT_PARTITIONS);
}
+ public int getFileCacheMaxSizeMB() {
+ return getInt(METADATA_FILE_CACHE_MAX_SIZE_MB);
+ }
+
/**
* Checks if a specific metadata index is marked for dropping based on the
metadata configuration.
* NOTE: Only applicable for secondary indexes (SI) or expression indexes
(EI).
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HFileReaderFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HFileReaderFactory.java
new file mode 100644
index 000000000000..665c38c14c62
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HFileReaderFactory.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Either;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.hudi.io.ByteArraySeekableDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
+import org.apache.hudi.io.hfile.HFileReader;
+import org.apache.hudi.io.hfile.HFileReaderImpl;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import java.io.IOException;
+
+/**
+ * Factory class to provide the implementation for
+ * the HFile Reader for {@link HoodieNativeAvroHFileReader}.
+ */
+public class HFileReaderFactory {
+
+ private final HoodieStorage storage;
+ private final HoodieMetadataConfig metadataConfig;
+ private final Either<StoragePath, byte[]> fileSource;
+
+ public HFileReaderFactory(HoodieStorage storage,
+ TypedProperties properties,
+ Either<StoragePath, byte[]> fileSource) {
+ this.storage = storage;
+ this.metadataConfig =
HoodieMetadataConfig.newBuilder().withProperties(properties).build();
+ this.fileSource = fileSource;
+ }
+
+ public HFileReader createHFileReader() throws IOException {
+ final long fileSize = determineFileSize();
+ final SeekableDataInputStream inputStream = createInputStream(fileSize);
+ return new HFileReaderImpl(inputStream, fileSize);
+ }
+
+ private long determineFileSize() throws IOException {
+ if (fileSource.isLeft()) {
+ return storage.getPathInfo(fileSource.asLeft()).getLength();
+ }
+ return fileSource.asRight().length;
+ }
+
+ private SeekableDataInputStream createInputStream(long fileSize) throws
IOException {
+ if (fileSource.isLeft()) {
+ if (fileSize <= (long) metadataConfig.getFileCacheMaxSizeMB() * 1024L *
1024L) {
+ // Download the whole file if the file size is below a configured
threshold
+ StoragePath path = fileSource.asLeft();
+ byte[] buffer;
+ try (SeekableDataInputStream stream = storage.openSeekable(path,
false)) {
+ buffer = new byte[(int) storage.getPathInfo(path).getLength()];
+ stream.readFully(buffer);
+ }
+ return new ByteArraySeekableDataInputStream(new
ByteBufferBackedInputStream(buffer));
+ }
+ return storage.openSeekable(fileSource.asLeft(), false);
+ }
+ return new ByteArraySeekableDataInputStream(new
ByteBufferBackedInputStream(fileSource.asRight()));
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private HoodieStorage storage;
+ private Option<TypedProperties> properties = Option.empty();
+ private Either<StoragePath, byte[]> fileSource;
+
+ public Builder withStorage(HoodieStorage storage) {
+ this.storage = storage;
+ return this;
+ }
+
+ public Builder withProps(TypedProperties props) {
+ this.properties = Option.of(props);
+ return this;
+ }
+
+ public Builder withPath(StoragePath path) {
+ ValidationUtils.checkState(fileSource == null, "HFile source already
set, cannot set path");
+ this.fileSource = Either.left(path);
+ return this;
+ }
+
+ public Builder withContent(byte[] bytesContent) {
+ ValidationUtils.checkState(fileSource == null, "HFile source already
set, cannot set bytes content");
+ this.fileSource = Either.right(bytesContent);
+ return this;
+ }
+
+ public HFileReaderFactory build() {
+ ValidationUtils.checkArgument(storage != null, "Storage cannot be null");
+ ValidationUtils.checkArgument(fileSource != null, "HFile source cannot
be null");
+ TypedProperties props = properties.isPresent() ? properties.get() : new
TypedProperties();
+ return new HFileReaderFactory(storage, props, fileSource);
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
index 94c8316fb030..c1bf11ee85e5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
@@ -29,19 +29,14 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.expression.Expression;
import org.apache.hudi.expression.Predicate;
import org.apache.hudi.expression.Predicates;
-import org.apache.hudi.io.ByteArraySeekableDataInputStream;
-import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.io.hfile.HFileReader;
-import org.apache.hudi.io.hfile.HFileReaderImpl;
import org.apache.hudi.io.hfile.KeyValue;
import org.apache.hudi.io.hfile.UTF8StringKey;
-import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.Lazy;
@@ -79,27 +74,17 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
private static final Set<String> PRELOADED_META_INFO_KEYS = new HashSet<>(
Arrays.asList(KEY_MIN_RECORD, KEY_MAX_RECORD, SCHEMA_KEY));
- private final HoodieStorage storage;
- private final Option<StoragePath> path;
- private final Option<byte[]> bytesContent;
+ private final HFileReaderFactory readerFactory;
+ private final StoragePath path;
// In-memory cache for meta info
private final Map<String, byte[]> metaInfoMap;
private final Lazy<Schema> schema;
private boolean isMetaInfoLoaded = false;
private long numKeyValueEntries = -1L;
- public HoodieNativeAvroHFileReader(HoodieStorage storage, StoragePath path,
Option<Schema> schemaOption) {
- this.storage = storage;
- this.path = Option.of(path);
- this.bytesContent = Option.empty();
- this.metaInfoMap = new HashMap<>();
- this.schema = schemaOption.map(Lazy::eagerly).orElseGet(() ->
Lazy.lazily(this::fetchSchema));
- }
-
- public HoodieNativeAvroHFileReader(HoodieStorage storage, byte[] content,
Option<Schema> schemaOption) {
- this.storage = storage;
- this.path = Option.empty();
- this.bytesContent = Option.of(content);
+ public HoodieNativeAvroHFileReader(HFileReaderFactory readerFactory,
StoragePath path, Option<Schema> schemaOption) {
+ this.readerFactory = readerFactory;
+ this.path = path;
this.metaInfoMap = new HashMap<>();
this.schema = schemaOption.map(Lazy::eagerly).orElseGet(() ->
Lazy.lazily(this::fetchSchema));
}
@@ -113,7 +98,7 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
"Schema projections are not supported in HFile reader");
}
- HFileReader reader = newHFileReader();
+ HFileReader reader = readerFactory.createHFileReader();
return new RecordIterator(reader, getSchema(), readerSchema);
}
@@ -130,7 +115,7 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
@Override
public BloomFilter readBloomFilter() {
- try (HFileReader reader = newHFileReader()) {
+ try (HFileReader reader = readerFactory.createHFileReader()) {
ByteBuffer byteBuffer =
reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK).get();
return BloomFilterFactory.fromByteBuffer(byteBuffer,
fromUTF8Bytes(reader.getMetaInfo(new
UTF8StringKey(KEY_BLOOM_FILTER_TYPE_CODE)).get()));
@@ -141,7 +126,7 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
@Override
public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
- try (HFileReader reader = newHFileReader()) {
+ try (HFileReader reader = readerFactory.createHFileReader()) {
reader.seekTo();
// candidateRowKeys must be sorted
return new TreeSet<>(candidateRowKeys).stream()
@@ -163,7 +148,7 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
@Override
public ClosableIterator<String> getRecordKeyIterator() throws IOException {
- HFileReader reader = newHFileReader();
+ HFileReader reader = readerFactory.createHFileReader();
return new ClosableIterator<String>() {
@Override
public boolean hasNext() {
@@ -220,7 +205,7 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
@Override
public ClosableIterator<HoodieRecord<IndexedRecord>>
getRecordsByKeysIterator(
List<String> sortedKeys, Schema schema) throws IOException {
- HFileReader reader = newHFileReader();
+ HFileReader reader = readerFactory.createHFileReader();
ClosableIterator<IndexedRecord> iterator =
new RecordByKeyIterator(reader, sortedKeys, getSchema(), schema);
return new CloseableMappingIterator<>(
@@ -230,7 +215,7 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
@Override
public ClosableIterator<HoodieRecord<IndexedRecord>>
getRecordsByKeyPrefixIterator(
List<String> sortedKeyPrefixes, Schema schema) throws IOException {
- HFileReader reader = newHFileReader();
+ HFileReader reader = readerFactory.createHFileReader();
ClosableIterator<IndexedRecord> iterator =
new RecordByKeyPrefixIterator(reader, sortedKeyPrefixes, getSchema(),
schema);
return new CloseableMappingIterator<>(
@@ -310,7 +295,7 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
private synchronized void loadAllMetaInfoIntoCacheIfNeeded() throws
IOException {
if (!isMetaInfoLoaded) {
// Load all meta info that are small into cache
- try (HFileReader reader = newHFileReader()) {
+ try (HFileReader reader = readerFactory.createHFileReader()) {
this.numKeyValueEntries = reader.getNumKeyValueEntries();
for (String metaInfoKey : PRELOADED_META_INFO_KEYS) {
Option<byte[]> metaInfo = reader.getMetaInfo(new
UTF8StringKey(metaInfoKey));
@@ -325,29 +310,16 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
}
}
- private HFileReader newHFileReader() throws IOException {
- SeekableDataInputStream inputStream;
- long fileSize;
- if (path.isPresent()) {
- fileSize = storage.getPathInfo(path.get()).getLength();
- inputStream = storage.openSeekable(path.get(), false);
- } else {
- fileSize = bytesContent.get().length;
- inputStream = new ByteArraySeekableDataInputStream(new
ByteBufferBackedInputStream(bytesContent.get()));
- }
- return new HFileReaderImpl(inputStream, fileSize);
- }
-
public ClosableIterator<IndexedRecord>
getIndexedRecordsByKeysIterator(List<String> sortedKeys,
Schema readerSchema) throws IOException {
- HFileReader reader = newHFileReader();
+ HFileReader reader = readerFactory.createHFileReader();
return new RecordByKeyIterator(reader, sortedKeys, getSchema(),
readerSchema);
}
@Override
public ClosableIterator<IndexedRecord>
getIndexedRecordsByKeyPrefixIterator(List<String> sortedKeyPrefixes,
Schema readerSchema) throws IOException {
- HFileReader reader = newHFileReader();
+ HFileReader reader = readerFactory.createHFileReader();
return new RecordByKeyPrefixIterator(reader, sortedKeyPrefixes,
getSchema(), readerSchema);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHFileReaderFactory.java
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHFileReaderFactory.java
new file mode 100644
index 000000000000..a0a98575d4cd
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHFileReaderFactory.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.io.SeekableDataInputStream;
+import org.apache.hudi.io.hfile.HFileReader;
+import org.apache.hudi.io.hfile.HFileReaderImpl;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class TestHFileReaderFactory {
+
+ @Mock
+ private HoodieStorage mockStorage;
+
+ @Mock
+ private StoragePath mockPath;
+
+ @Mock
+ private StoragePathInfo mockPathInfo;
+
+ @Mock
+ private SeekableDataInputStream mockInputStream;
+
+ private TypedProperties properties;
+ private final byte[] testContent = "test content".getBytes();
+
+ @BeforeEach
+ void setUp() {
+ properties = new TypedProperties();
+ }
+
+ @Test
+ void testCreateHFileReader_FileSizeBelowThreshold_ShouldUseContentCache()
throws IOException {
+ final long fileSizeBelow = 5L; // 5 bytes - below default threshold
+ final int thresholdMB = 10; // 10MB threshold
+
+
properties.setProperty(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB.key(),
String.valueOf(thresholdMB));
+
+ when(mockStorage.getPathInfo(mockPath)).thenReturn(mockPathInfo);
+ when(mockPathInfo.getLength()).thenReturn(fileSizeBelow);
+ when(mockStorage.openSeekable(mockPath,
false)).thenReturn(mockInputStream);
+ doAnswer(invocation -> {
+ byte[] buffer = invocation.getArgument(0);
+ System.arraycopy(testContent, 0, buffer, 0, Math.min(testContent.length,
buffer.length));
+ return null;
+ }).when(mockInputStream).readFully(any(byte[].class));
+
+ HFileReaderFactory factory = HFileReaderFactory.builder()
+ .withStorage(mockStorage)
+ .withProps(properties)
+ .withPath(mockPath)
+ .build();
+ HFileReader result = factory.createHFileReader();
+
+ assertNotNull(result);
+ assertInstanceOf(HFileReaderImpl.class, result);
+
+ // Verify that content was downloaded (cache was used)
+ verify(mockStorage, times(2)).getPathInfo(mockPath); // Once for size
determination, once for download
+ verify(mockStorage, times(1)).openSeekable(mockPath, false); // For
content download
+ verify(mockInputStream, times(1)).readFully(any(byte[].class));
+ }
+
+ @Test
+ void testCreateHFileReader_FileSizeAboveThreshold_ShouldNotUseContentCache()
throws IOException {
+ final long fileSizeAbove = 15L * 1024L * 1024L; // 15MB - above 10MB
threshold
+ final int thresholdMB = 10; // 10MB threshold
+
+
properties.setProperty(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB.key(),
String.valueOf(thresholdMB));
+
+ when(mockStorage.getPathInfo(mockPath)).thenReturn(mockPathInfo);
+ when(mockPathInfo.getLength()).thenReturn(fileSizeAbove);
+ when(mockStorage.openSeekable(mockPath,
false)).thenReturn(mockInputStream);
+
+ HFileReaderFactory factory = HFileReaderFactory.builder()
+ .withStorage(mockStorage)
+ .withProps(properties)
+ .withPath(mockPath)
+ .build();
+ HFileReader result = factory.createHFileReader();
+
+ assertNotNull(result);
+ assertInstanceOf(HFileReaderImpl.class, result);
+
+ // Verify that content was NOT downloaded (cache was not used)
+ verify(mockStorage, times(1)).getPathInfo(mockPath); // Only once for size
determination
+ verify(mockStorage, times(1)).openSeekable(mockPath, false); // For
creating input stream directly
+ verify(mockInputStream, never()).readFully(any(byte[].class)); // Content
not downloaded
+ }
+
+ @Test
+ void
testCreateHFileReader_ContentProvidedInConstructor_ShouldUseProvidedContent()
throws IOException {
+ final int thresholdMB = 10; // 10MB threshold
+
+
properties.setProperty(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB.key(),
String.valueOf(thresholdMB));
+
+ HFileReaderFactory factory = HFileReaderFactory.builder()
+ .withStorage(mockStorage)
+ .withProps(properties)
+ .withContent(testContent)
+ .build();
+ HFileReader result = factory.createHFileReader();
+
+ assertNotNull(result);
+ assertInstanceOf(HFileReaderImpl.class, result);
+
+ // Verify that storage was never accessed since content was provided
+ verify(mockStorage, never()).getPathInfo(any());
+ verify(mockStorage, never()).openSeekable(any(), anyBoolean());
+ }
+
+ @Test
+ void testCreateHFileReader_ContentProvidedAndPathProvided_ShouldFail()
throws IOException {
+ final int thresholdMB = 10;
+
+
properties.setProperty(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB.key(),
String.valueOf(thresholdMB));
+
+ IllegalStateException exception =
Assertions.assertThrows(IllegalStateException.class, () ->
HFileReaderFactory.builder()
+ .withStorage(mockStorage)
+ .withProps(properties)
+ .withPath(mockPath)
+ .withContent(testContent)
+ .build());
+ assertEquals("HFile source already set, cannot set bytes content",
exception.getMessage());
+
+ exception = Assertions.assertThrows(IllegalStateException.class, () ->
HFileReaderFactory.builder()
+ .withStorage(mockStorage)
+ .withProps(properties)
+ .withContent(testContent)
+ .withPath(mockPath)
+ .build());
+ assertEquals("HFile source already set, cannot set path",
exception.getMessage());
+ }
+
+ @Test
+ void testCreateHFileReader_NoPathOrContent_ShouldThrowException() {
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> {
+ HFileReaderFactory.builder()
+ .withStorage(mockStorage)
+ .withProps(properties)
+ .build();
+ });
+ assertEquals("HFile source cannot be null", exception.getMessage());
+ }
+
+ @Test
+ void testBuilder_WithNullStorage_ShouldThrowException() {
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> {
+ HFileReaderFactory.builder()
+ .withStorage(null)
+ .withPath(mockPath)
+ .build();
+ });
+ assertEquals("Storage cannot be null", exception.getMessage());
+ }
+
+ @Test
+ void testBuilder_WithoutPropertiesProvided_ShouldUseDefaultProperties()
throws IOException {
+ when(mockStorage.getPathInfo(mockPath)).thenReturn(mockPathInfo);
+ when(mockPathInfo.getLength()).thenReturn(1024L);
+ when(mockStorage.openSeekable(mockPath,
false)).thenReturn(mockInputStream);
+
+ // Not providing properties, should use defaults
+ HFileReaderFactory factory = HFileReaderFactory.builder()
+ .withStorage(mockStorage)
+ .withPath(mockPath)
+ .build();
+
+ HFileReader result = factory.createHFileReader();
+ assertNotNull(result);
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieNativeAvroHFileReader.java
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieNativeAvroHFileReader.java
index 7b25dfa3f8cd..9d90ea298ad3 100644
---
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieNativeAvroHFileReader.java
+++
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieNativeAvroHFileReader.java
@@ -19,6 +19,7 @@
package org.apache.hudi.io.storage;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.expression.Expression;
import org.apache.hudi.expression.Predicate;
@@ -38,13 +39,17 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class TestHoodieNativeAvroHFileReader {
+
+ private static final TypedProperties DEFAULT_PROPS = new TypedProperties();
private static HoodieNativeAvroHFileReader reader;
TestHoodieNativeAvroHFileReader() {
HoodieStorage storage = mock(HoodieStorage.class);
StoragePath path = new StoragePath("anyPath");
- reader = new HoodieNativeAvroHFileReader(
- storage, path, Option.empty());
+ HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+ .withStorage(storage).withProps(DEFAULT_PROPS)
+ .withPath(path).build();
+ reader = new HoodieNativeAvroHFileReader(readerFactory, path,
Option.empty());
}
@Test
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java
index 21f43bc26aa2..3cf3ed8a4818 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.storage.HFileReaderFactory;
import org.apache.hudi.io.storage.HoodieAvroBootstrapFileReader;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -47,7 +48,10 @@ public class HoodieAvroFileReaderFactory extends
HoodieFileReaderFactory {
protected HoodieFileReader newHFileFileReader(HoodieConfig hoodieConfig,
StoragePath path,
Option<Schema> schemaOption)
throws IOException {
- return new HoodieNativeAvroHFileReader(storage, path, schemaOption);
+ HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+ .withStorage(storage).withProps(hoodieConfig.getProps())
+ .withPath(path).build();
+ return new HoodieNativeAvroHFileReader(readerFactory, path, schemaOption);
}
@Override
@@ -56,7 +60,10 @@ public class HoodieAvroFileReaderFactory extends
HoodieFileReaderFactory {
HoodieStorage storage,
byte[] content,
Option<Schema> schemaOption)
throws IOException {
- return new HoodieNativeAvroHFileReader(this.storage, content,
schemaOption);
+ HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+ .withStorage(storage).withProps(hoodieConfig.getProps())
+ .withContent(content).build();
+ return new HoodieNativeAvroHFileReader(readerFactory, path, schemaOption);
}
@Override
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
index 8af1f6b49bfc..260d98a8b64b 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
@@ -20,6 +20,7 @@
package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -31,6 +32,7 @@ import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.HFileReaderFactory;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
@@ -87,6 +89,7 @@ import static org.mockito.Mockito.when;
public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
protected static final int NUM_RECORDS_FIXTURE = 50;
+ protected static final TypedProperties DEFAULT_PROPS = new TypedProperties();
protected static Stream<Arguments> populateMetaFieldsAndTestAvroWithMeta() {
return Arrays.stream(new Boolean[][] {
@@ -100,12 +103,18 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
@Override
protected HoodieAvroFileReader createReader(
HoodieStorage storage) throws Exception {
- return new HoodieNativeAvroHFileReader(storage, getFilePath(),
Option.empty());
+ HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+ .withStorage(storage).withProps(DEFAULT_PROPS)
+ .withPath(getFilePath()).build();
+ return new HoodieNativeAvroHFileReader(readerFactory, getFilePath(),
Option.empty());
}
protected HoodieAvroHFileReaderImplBase createHFileReader(HoodieStorage
storage,
byte[] content)
throws IOException {
- return new HoodieNativeAvroHFileReader(storage, content, Option.empty());
+ HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+ .withStorage(storage).withProps(DEFAULT_PROPS)
+ .withContent(content).build();
+ return new HoodieNativeAvroHFileReader(readerFactory, getFilePath(),
Option.empty());
}
protected void verifyHFileReader(byte[] content,