This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new e23f402e194 [HUDI-7347] Introduce SeekableDataInputStream for random access (#10575) e23f402e194 is described below commit e23f402e194498088f17142d9f132548ffbbd91d Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Wed Jan 31 16:48:46 2024 -0800 [HUDI-7347] Introduce SeekableDataInputStream for random access (#10575) --- .../hudi/common/table/log/HoodieLogFileReader.java | 36 +++++++++++---- .../table/log/block/HoodieAvroDataBlock.java | 4 +- .../common/table/log/block/HoodieCDCDataBlock.java | 4 +- .../common/table/log/block/HoodieCommandBlock.java | 5 +- .../common/table/log/block/HoodieCorruptBlock.java | 5 +- .../common/table/log/block/HoodieDataBlock.java | 4 +- .../common/table/log/block/HoodieDeleteBlock.java | 6 +-- .../table/log/block/HoodieHFileDataBlock.java | 4 +- .../common/table/log/block/HoodieLogBlock.java | 16 +++---- .../table/log/block/HoodieParquetDataBlock.java | 4 +- .../hadoop/fs/HadoopSeekableDataInputStream.java | 48 ++++++++++++++++++++ .../apache/hudi/io/SeekableDataInputStream.java | 53 ++++++++++++++++++++++ 12 files changed, 150 insertions(+), 39 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index cce13c1a6e2..fa8174931c4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -37,9 +37,11 @@ import org.apache.hudi.exception.CorruptedLogFileException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.hadoop.fs.BoundedFsDataInputStream; +import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream; import org.apache.hudi.hadoop.fs.SchemeAwareFSDataInputStream; import org.apache.hudi.hadoop.fs.TimedFSDataInputStream; import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hudi.io.util.IOUtils; import org.apache.hudi.storage.StorageSchemes; @@ -90,7 +92,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private final boolean reverseReader; private final boolean enableRecordLookups; private boolean closed = false; - private FSDataInputStream inputStream; + private SeekableDataInputStream inputStream; public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily) throws IOException { @@ -120,7 +122,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { Path updatedPath = FSUtils.makeQualified(fs, logFile.getPath()); this.logFile = updatedPath.equals(logFile.getPath()) ? logFile : new HoodieLogFile(updatedPath, logFile.getFileSize()); this.bufferSize = bufferSize; - this.inputStream = getFSDataInputStream(fs, this.logFile, bufferSize); + this.inputStream = getDataInputStream(fs, this.logFile, bufferSize); this.readerSchema = readerSchema; this.readBlockLazily = readBlockLazily; this.reverseReader = reverseReader; @@ -202,7 +204,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) { return HoodieAvroDataBlock.getBlock(content.get(), readerSchema, internalSchema); } else { - return new HoodieAvroDataBlock(() -> getFSDataInputStream(fs, this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, + return new HoodieAvroDataBlock(() -> getDataInputStream(fs, this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, getTargetReaderSchemaForBlock(), header, footer, keyField); } @@ -210,7 +212,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION, String.format("HFile block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION)); return new HoodieHFileDataBlock( - () -> getFSDataInputStream(fs, this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, + () -> getDataInputStream(fs, this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, Option.ofNullable(readerSchema), header, footer, enableRecordLookups, logFile.getPath(), ConfigUtils.getBooleanWithAltKeys(fs.getConf(), USE_NATIVE_HFILE_READER)); @@ -218,17 +220,17 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION, String.format("Parquet block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION)); - return new HoodieParquetDataBlock(() -> getFSDataInputStream(fs, this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, + return new HoodieParquetDataBlock(() -> getDataInputStream(fs, this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, getTargetReaderSchemaForBlock(), header, footer, keyField); case DELETE_BLOCK: - return new HoodieDeleteBlock(content, () -> getFSDataInputStream(fs, this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), header, footer); + return new HoodieDeleteBlock(content, () -> getDataInputStream(fs, this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), header, footer); case COMMAND_BLOCK: - return new HoodieCommandBlock(content, () -> getFSDataInputStream(fs, this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), header, footer); + return new HoodieCommandBlock(content, () -> getDataInputStream(fs, this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), header, footer); case CDC_DATA_BLOCK: - return new HoodieCDCDataBlock(() -> getFSDataInputStream(fs, this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, readerSchema, header, keyField); + return new HoodieCDCDataBlock(() -> getDataInputStream(fs, this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, readerSchema, header, keyField); default: throw new HoodieNotSupportedException("Unsupported Block " + blockType); @@ -270,7 +272,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, readBlockLazily); HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = new HoodieLogBlock.HoodieLogBlockContentLocation(hadoopConf, logFile, contentPosition, corruptedBlockSize, nextBlockOffset); - return new HoodieCorruptBlock(corruptedBytes, () -> getFSDataInputStream(fs, this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), new HashMap<>(), new HashMap<>()); + return new HoodieCorruptBlock(corruptedBytes, () -> getDataInputStream(fs, this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), new HashMap<>(), new HashMap<>()); } private boolean isBlockCorrupted(int blocksize) throws IOException { @@ -474,9 +476,23 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { throw new UnsupportedOperationException("Remove not supported for HoodieLogFileReader"); } + /** + * Fetch the right {@link SeekableDataInputStream} to be used by wrapping with required input streams. + * + * @param fs instance of {@link FileSystem} in use. + * @param bufferSize buffer size to be used. + * @return the right {@link SeekableDataInputStream} as required. + */ + private static SeekableDataInputStream getDataInputStream(FileSystem fs, + HoodieLogFile logFile, + int bufferSize) { + return new HadoopSeekableDataInputStream(getFSDataInputStream(fs, logFile, bufferSize)); + } + /** * Fetch the right {@link FSDataInputStream} to be used by wrapping with required input streams. - * @param fs instance of {@link FileSystem} in use. + * + * @param fs instance of {@link FileSystem} in use. * @param bufferSize buffer size to be used. * @return the right {@link FSDataInputStream} as required. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java index ce3e23d50e7..590d9a17a0d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.io.SeekableDataInputStream; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -40,7 +41,6 @@ import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; -import org.apache.hadoop.fs.FSDataInputStream; import javax.annotation.Nonnull; @@ -75,7 +75,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { private final ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>(); - public HoodieAvroDataBlock(Supplier<FSDataInputStream> inputStreamSupplier, + public HoodieAvroDataBlock(Supplier<SeekableDataInputStream> inputStreamSupplier, Option<byte[]> content, boolean readBlockLazily, HoodieLogBlockContentLocation logBlockContentLocation, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java index 814e75821b5..f32eeb0dfb7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java @@ -20,9 +20,9 @@ package org.apache.hudi.common.table.log.block; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; +import org.apache.hudi.io.SeekableDataInputStream; import org.apache.avro.Schema; -import org.apache.hadoop.fs.FSDataInputStream; import java.util.HashMap; import java.util.List; @@ -35,7 +35,7 @@ import java.util.function.Supplier; public class HoodieCDCDataBlock extends HoodieAvroDataBlock { public HoodieCDCDataBlock( - Supplier<FSDataInputStream> inputStreamSupplier, + Supplier<SeekableDataInputStream> inputStreamSupplier, Option<byte[]> content, boolean readBlockLazily, HoodieLogBlockContentLocation logBlockContentLocation, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java index ed5338344ad..deeb903cd18 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java @@ -19,8 +19,7 @@ package org.apache.hudi.common.table.log.block; import org.apache.hudi.common.util.Option; - -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hudi.io.SeekableDataInputStream; import java.util.HashMap; import java.util.Map; @@ -44,7 +43,7 @@ public class HoodieCommandBlock extends HoodieLogBlock { this(Option.empty(), null, false, Option.empty(), header, new HashMap<>()); } - public HoodieCommandBlock(Option<byte[]> content, Supplier<FSDataInputStream> inputStreamSupplier, boolean readBlockLazily, + public HoodieCommandBlock(Option<byte[]> content, Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily, Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) { super(header, footer, blockContentLocation, content, inputStreamSupplier, readBlockLazily); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java index 928ae780ee6..19d704c2595 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java @@ -19,8 +19,7 @@ package org.apache.hudi.common.table.log.block; import org.apache.hudi.common.util.Option; - -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hudi.io.SeekableDataInputStream; import java.io.IOException; import java.util.Map; @@ -32,7 +31,7 @@ import java.util.function.Supplier; */ public class HoodieCorruptBlock extends HoodieLogBlock { - public HoodieCorruptBlock(Option<byte[]> corruptedBytes, Supplier<FSDataInputStream> inputStreamSupplier, boolean readBlockLazily, + public HoodieCorruptBlock(Option<byte[]> corruptedBytes, Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily, Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) { super(header, footer, blockContentLocation, corruptedBytes, inputStreamSupplier, readBlockLazily); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java index ec4d548de23..1b024a3b530 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java @@ -24,9 +24,9 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.SeekableDataInputStream; import org.apache.avro.Schema; -import org.apache.hadoop.fs.FSDataInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,7 +110,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { * NOTE: This ctor is used on the write-path (ie when records ought to be written into the log) */ protected HoodieDataBlock(Option<byte[]> content, - Supplier<FSDataInputStream> inputStreamSupplier, + Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily, Option<HoodieLogBlockContentLocation> blockContentLocation, Option<Schema> readerSchema, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java index b0b1a76d42d..a55f4f1e623 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SerializationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hudi.util.Lazy; import org.apache.avro.io.BinaryDecoder; @@ -37,7 +38,6 @@ import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.hadoop.fs.FSDataInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,14 +97,14 @@ public class HoodieDeleteBlock extends HoodieLogBlock { this.recordsToDelete = recordsToDelete.stream().map(Pair::getLeft).toArray(DeleteRecord[]::new); } - public HoodieDeleteBlock(Option<byte[]> content, Supplier<FSDataInputStream> inputStreamSupplier, boolean readBlockLazily, + public HoodieDeleteBlock(Option<byte[]> content, Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily, Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) { // Setting `shouldWriteRecordPositions` to false as this constructor is only used by the reader this(content, inputStreamSupplier, readBlockLazily, blockContentLocation, header, footer, false); } - HoodieDeleteBlock(Option<byte[]> content, Supplier<FSDataInputStream> inputStreamSupplier, boolean readBlockLazily, + HoodieDeleteBlock(Option<byte[]> content, Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily, Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer, boolean shouldWriteRecordPositions) { super(header, footer, blockContentLocation, content, inputStreamSupplier, readBlockLazily); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 86cfe91f0f7..fa447063aa4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -42,7 +43,6 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -83,7 +83,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { private final Path pathForReader; private final HoodieConfig hFileReaderConfig; - public HoodieHFileDataBlock(Supplier<FSDataInputStream> inputStreamSupplier, + public HoodieHFileDataBlock(Supplier<SeekableDataInputStream> inputStreamSupplier, Option<byte[]> content, boolean readBlockLazily, HoodieLogBlockContentLocation logBlockContentLocation, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index 2821c132d6b..b9c1063083e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -25,9 +25,9 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.TypeUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +36,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; @@ -68,10 +67,7 @@ public abstract class HoodieLogBlock { private final Option<HoodieLogBlockContentLocation> blockContentLocation; // data for a specific block private Option<byte[]> content; - // TODO : change this to just InputStream so this works for any FileSystem - // create handlers to return specific type of inputstream based on FS - // input stream corresponding to the log file where this logBlock belongs - private final Supplier<FSDataInputStream> inputStreamSupplier; + private final Supplier<SeekableDataInputStream> inputStreamSupplier; // Toggle flag, whether to read blocks lazily (I/O intensive) or not (Memory intensive) protected boolean readBlockLazily; @@ -80,7 +76,7 @@ public abstract class HoodieLogBlock { @Nonnull Map<HeaderMetadataType, String> logBlockFooter, @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content, - @Nullable Supplier<FSDataInputStream> inputStreamSupplier, + @Nullable Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily) { this.logBlockHeader = logBlockHeader; this.logBlockFooter = logBlockFooter; @@ -265,7 +261,7 @@ public abstract class HoodieLogBlock { /** * Convert bytes to LogMetadata, follow the same order as {@link HoodieLogBlock#getLogMetadataBytes}. */ - public static Map<HeaderMetadataType, String> getLogMetadata(DataInputStream dis) throws IOException { + public static Map<HeaderMetadataType, String> getLogMetadata(SeekableDataInputStream dis) throws IOException { Map<HeaderMetadataType, String> metadata = new HashMap<>(); // 1. Read the metadata written out @@ -289,7 +285,7 @@ public abstract class HoodieLogBlock { * Read or Skip block content of a log block in the log file. Depends on lazy reading enabled in * {@link HoodieMergedLogRecordScanner} */ - public static Option<byte[]> tryReadContent(FSDataInputStream inputStream, Integer contentLength, boolean readLazily) + public static Option<byte[]> tryReadContent(SeekableDataInputStream inputStream, Integer contentLength, boolean readLazily) throws IOException { if (readLazily) { // Seek to the end of the content block @@ -311,7 +307,7 @@ public abstract class HoodieLogBlock { checkState(!content.isPresent(), "Block has already been inflated"); checkState(inputStreamSupplier != null, "Block should have input-stream provided"); - try (FSDataInputStream inputStream = inputStreamSupplier.get()) { + try (SeekableDataInputStream inputStream = inputStreamSupplier.get()) { content = Option.of(new byte[(int) this.getBlockContentLocation().get().getBlockSize()]); inputStream.seek(this.getBlockContentLocation().get().getContentPositionInLogFile()); inputStream.readFully(content.get(), 0, content.get().length); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java index 1b8880c4a0d..485d70890b4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -27,13 +27,13 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetWriter; @@ -64,7 +64,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { private final Option<Double> expectedCompressionRatio; private final Option<Boolean> useDictionaryEncoding; - public HoodieParquetDataBlock(Supplier<FSDataInputStream> inputStreamSupplier, + public HoodieParquetDataBlock(Supplier<SeekableDataInputStream> inputStreamSupplier, Option<byte[]> content, boolean readBlockLazily, HoodieLogBlockContentLocation logBlockContentLocation, diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopSeekableDataInputStream.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopSeekableDataInputStream.java new file mode 100644 index 00000000000..ae10ca0ac3f --- /dev/null +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopSeekableDataInputStream.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop.fs; + +import org.apache.hudi.io.SeekableDataInputStream; + +import org.apache.hadoop.fs.FSDataInputStream; + +import java.io.IOException; + +/** + * An implementation of {@link SeekableDataInputStream} based on Hadoop's {@link FSDataInputStream} + */ +public class HadoopSeekableDataInputStream extends SeekableDataInputStream { + private final FSDataInputStream stream; + + public HadoopSeekableDataInputStream(FSDataInputStream stream) { + super(stream); + this.stream = stream; + } + + @Override + public long getPos() throws IOException { + return stream.getPos(); + } + + @Override + public void seek(long pos) throws IOException { + stream.seek(pos); + } +} diff --git a/hudi-io/src/main/java/org/apache/hudi/io/SeekableDataInputStream.java b/hudi-io/src/main/java/org/apache/hudi/io/SeekableDataInputStream.java new file mode 100644 index 00000000000..c76fd3be32d --- /dev/null +++ b/hudi-io/src/main/java/org/apache/hudi/io/SeekableDataInputStream.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * A {@link InputStream} that supports random access by allowing to seek to + * an arbitrary position within the stream and read the content. + */ +public abstract class SeekableDataInputStream extends DataInputStream { + /** + * Creates a DataInputStream that uses the specified + * underlying InputStream. + * + * @param in the specified input stream + */ + public SeekableDataInputStream(InputStream in) { + super(in); + } + + /** + * @return current position of the stream. The next read() will be from that location. + */ + public abstract long getPos() throws IOException; + + /** + * Seeks to a position within the stream. + * + * @param pos target position to seek to. + * @throws IOException upon error. + */ + public abstract void seek(long pos) throws IOException; +}