This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e23f402e194 [HUDI-7347] Introduce SeekableDataInputStream for random 
access (#10575)
e23f402e194 is described below

commit e23f402e194498088f17142d9f132548ffbbd91d
Author: Y Ethan Guo <ethan.guoyi...@gmail.com>
AuthorDate: Wed Jan 31 16:48:46 2024 -0800

    [HUDI-7347] Introduce SeekableDataInputStream for random access (#10575)
---
 .../hudi/common/table/log/HoodieLogFileReader.java | 36 +++++++++++----
 .../table/log/block/HoodieAvroDataBlock.java       |  4 +-
 .../common/table/log/block/HoodieCDCDataBlock.java |  4 +-
 .../common/table/log/block/HoodieCommandBlock.java |  5 +-
 .../common/table/log/block/HoodieCorruptBlock.java |  5 +-
 .../common/table/log/block/HoodieDataBlock.java    |  4 +-
 .../common/table/log/block/HoodieDeleteBlock.java  |  6 +--
 .../table/log/block/HoodieHFileDataBlock.java      |  4 +-
 .../common/table/log/block/HoodieLogBlock.java     | 16 +++----
 .../table/log/block/HoodieParquetDataBlock.java    |  4 +-
 .../hadoop/fs/HadoopSeekableDataInputStream.java   | 48 ++++++++++++++++++++
 .../apache/hudi/io/SeekableDataInputStream.java    | 53 ++++++++++++++++++++++
 12 files changed, 150 insertions(+), 39 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index cce13c1a6e2..fa8174931c4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -37,9 +37,11 @@ import org.apache.hudi.exception.CorruptedLogFileException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.hadoop.fs.BoundedFsDataInputStream;
+import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
 import org.apache.hudi.hadoop.fs.SchemeAwareFSDataInputStream;
 import org.apache.hudi.hadoop.fs.TimedFSDataInputStream;
 import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.io.util.IOUtils;
 import org.apache.hudi.storage.StorageSchemes;
 
@@ -90,7 +92,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   private final boolean reverseReader;
   private final boolean enableRecordLookups;
   private boolean closed = false;
-  private FSDataInputStream inputStream;
+  private SeekableDataInputStream inputStream;
 
   public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize,
                              boolean readBlockLazily) throws IOException {
@@ -120,7 +122,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
     Path updatedPath = FSUtils.makeQualified(fs, logFile.getPath());
     this.logFile = updatedPath.equals(logFile.getPath()) ? logFile : new 
HoodieLogFile(updatedPath, logFile.getFileSize());
     this.bufferSize = bufferSize;
-    this.inputStream = getFSDataInputStream(fs, this.logFile, bufferSize);
+    this.inputStream = getDataInputStream(fs, this.logFile, bufferSize);
     this.readerSchema = readerSchema;
     this.readBlockLazily = readBlockLazily;
     this.reverseReader = reverseReader;
@@ -202,7 +204,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
         if (nextBlockVersion.getVersion() == 
HoodieLogFormatVersion.DEFAULT_VERSION) {
           return HoodieAvroDataBlock.getBlock(content.get(), readerSchema, 
internalSchema);
         } else {
-          return new HoodieAvroDataBlock(() -> getFSDataInputStream(fs, 
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc,
+          return new HoodieAvroDataBlock(() -> getDataInputStream(fs, 
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc,
               getTargetReaderSchemaForBlock(), header, footer, keyField);
         }
 
@@ -210,7 +212,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
         checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
             String.format("HFile block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
         return new HoodieHFileDataBlock(
-            () -> getFSDataInputStream(fs, this.logFile, bufferSize), content, 
readBlockLazily, logBlockContentLoc,
+            () -> getDataInputStream(fs, this.logFile, bufferSize), content, 
readBlockLazily, logBlockContentLoc,
             Option.ofNullable(readerSchema), header, footer, 
enableRecordLookups, logFile.getPath(),
             ConfigUtils.getBooleanWithAltKeys(fs.getConf(), 
USE_NATIVE_HFILE_READER));
 
@@ -218,17 +220,17 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
         checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
             String.format("Parquet block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
 
-        return new HoodieParquetDataBlock(() -> getFSDataInputStream(fs, 
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc,
+        return new HoodieParquetDataBlock(() -> getDataInputStream(fs, 
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc,
             getTargetReaderSchemaForBlock(), header, footer, keyField);
 
       case DELETE_BLOCK:
-        return new HoodieDeleteBlock(content, () -> getFSDataInputStream(fs, 
this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), 
header, footer);
+        return new HoodieDeleteBlock(content, () -> getDataInputStream(fs, 
this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), 
header, footer);
 
       case COMMAND_BLOCK:
-        return new HoodieCommandBlock(content, () -> getFSDataInputStream(fs, 
this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), 
header, footer);
+        return new HoodieCommandBlock(content, () -> getDataInputStream(fs, 
this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), 
header, footer);
 
       case CDC_DATA_BLOCK:
-        return new HoodieCDCDataBlock(() -> getFSDataInputStream(fs, 
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, 
readerSchema, header, keyField);
+        return new HoodieCDCDataBlock(() -> getDataInputStream(fs, 
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, 
readerSchema, header, keyField);
 
       default:
         throw new HoodieNotSupportedException("Unsupported Block " + 
blockType);
@@ -270,7 +272,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
     Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, 
corruptedBlockSize, readBlockLazily);
     HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
         new HoodieLogBlock.HoodieLogBlockContentLocation(hadoopConf, logFile, 
contentPosition, corruptedBlockSize, nextBlockOffset);
-    return new HoodieCorruptBlock(corruptedBytes, () -> 
getFSDataInputStream(fs, this.logFile, bufferSize), readBlockLazily, 
Option.of(logBlockContentLoc), new HashMap<>(), new HashMap<>());
+    return new HoodieCorruptBlock(corruptedBytes, () -> getDataInputStream(fs, 
this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), new 
HashMap<>(), new HashMap<>());
   }
 
   private boolean isBlockCorrupted(int blocksize) throws IOException {
@@ -474,9 +476,23 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
     throw new UnsupportedOperationException("Remove not supported for 
HoodieLogFileReader");
   }
 
+  /**
+   * Fetch the right {@link SeekableDataInputStream} to be used by wrapping 
with required input streams.
+   *
+   * @param fs         instance of {@link FileSystem} in use.
+   * @param bufferSize buffer size to be used.
+   * @return the right {@link SeekableDataInputStream} as required.
+   */
+  private static SeekableDataInputStream getDataInputStream(FileSystem fs,
+                                                            HoodieLogFile 
logFile,
+                                                            int bufferSize) {
+    return new HadoopSeekableDataInputStream(getFSDataInputStream(fs, logFile, 
bufferSize));
+  }
+
   /**
    * Fetch the right {@link FSDataInputStream} to be used by wrapping with 
required input streams.
-   * @param fs instance of {@link FileSystem} in use.
+   *
+   * @param fs         instance of {@link FileSystem} in use.
    * @param bufferSize buffer size to be used.
    * @return the right {@link FSDataInputStream} as required.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index ce3e23d50e7..590d9a17a0d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -29,6 +29,7 @@ import 
org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.io.SeekableDataInputStream;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
@@ -40,7 +41,6 @@ import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
 
 import javax.annotation.Nonnull;
 
@@ -75,7 +75,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
 
   private final ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
 
-  public HoodieAvroDataBlock(Supplier<FSDataInputStream> inputStreamSupplier,
+  public HoodieAvroDataBlock(Supplier<SeekableDataInputStream> 
inputStreamSupplier,
                              Option<byte[]> content,
                              boolean readBlockLazily,
                              HoodieLogBlockContentLocation 
logBlockContentLocation,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
index 814e75821b5..f32eeb0dfb7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
@@ -20,9 +20,9 @@ package org.apache.hudi.common.table.log.block;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.SeekableDataInputStream;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FSDataInputStream;
 
 import java.util.HashMap;
 import java.util.List;
@@ -35,7 +35,7 @@ import java.util.function.Supplier;
 public class HoodieCDCDataBlock extends HoodieAvroDataBlock {
 
   public HoodieCDCDataBlock(
-      Supplier<FSDataInputStream> inputStreamSupplier,
+      Supplier<SeekableDataInputStream> inputStreamSupplier,
       Option<byte[]> content,
       boolean readBlockLazily,
       HoodieLogBlockContentLocation logBlockContentLocation,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
index ed5338344ad..deeb903cd18 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
@@ -19,8 +19,7 @@
 package org.apache.hudi.common.table.log.block;
 
 import org.apache.hudi.common.util.Option;
-
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -44,7 +43,7 @@ public class HoodieCommandBlock extends HoodieLogBlock {
     this(Option.empty(), null, false, Option.empty(), header, new HashMap<>());
   }
 
-  public HoodieCommandBlock(Option<byte[]> content, 
Supplier<FSDataInputStream> inputStreamSupplier, boolean readBlockLazily,
+  public HoodieCommandBlock(Option<byte[]> content, 
Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily,
                             Option<HoodieLogBlockContentLocation> 
blockContentLocation, Map<HeaderMetadataType, String> header,
                             Map<HeaderMetadataType, String> footer) {
     super(header, footer, blockContentLocation, content, inputStreamSupplier, 
readBlockLazily);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java
index 928ae780ee6..19d704c2595 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java
@@ -19,8 +19,7 @@
 package org.apache.hudi.common.table.log.block;
 
 import org.apache.hudi.common.util.Option;
-
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
 
 import java.io.IOException;
 import java.util.Map;
@@ -32,7 +31,7 @@ import java.util.function.Supplier;
  */
 public class HoodieCorruptBlock extends HoodieLogBlock {
 
-  public HoodieCorruptBlock(Option<byte[]> corruptedBytes, 
Supplier<FSDataInputStream> inputStreamSupplier, boolean readBlockLazily,
+  public HoodieCorruptBlock(Option<byte[]> corruptedBytes, 
Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily,
                             Option<HoodieLogBlockContentLocation> 
blockContentLocation, Map<HeaderMetadataType, String> header,
                             Map<HeaderMetadataType, String> footer) {
     super(header, footer, blockContentLocation, corruptedBytes, 
inputStreamSupplier, readBlockLazily);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index ec4d548de23..1b024a3b530 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -24,9 +24,9 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.SeekableDataInputStream;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,7 +110,7 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
    * NOTE: This ctor is used on the write-path (ie when records ought to be 
written into the log)
    */
   protected HoodieDataBlock(Option<byte[]> content,
-                            Supplier<FSDataInputStream> inputStreamSupplier,
+                            Supplier<SeekableDataInputStream> 
inputStreamSupplier,
                             boolean readBlockLazily,
                             Option<HoodieLogBlockContentLocation> 
blockContentLocation,
                             Option<Schema> readerSchema,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
index b0b1a76d42d..a55f4f1e623 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SerializationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.util.Lazy;
 
 import org.apache.avro.io.BinaryDecoder;
@@ -37,7 +38,6 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,14 +97,14 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
     this.recordsToDelete = 
recordsToDelete.stream().map(Pair::getLeft).toArray(DeleteRecord[]::new);
   }
 
-  public HoodieDeleteBlock(Option<byte[]> content, Supplier<FSDataInputStream> 
inputStreamSupplier, boolean readBlockLazily,
+  public HoodieDeleteBlock(Option<byte[]> content, 
Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily,
                            Option<HoodieLogBlockContentLocation> 
blockContentLocation, Map<HeaderMetadataType, String> header,
                            Map<HeaderMetadataType, String> footer) {
     // Setting `shouldWriteRecordPositions` to false as this constructor is 
only used by the reader
     this(content, inputStreamSupplier, readBlockLazily, blockContentLocation, 
header, footer, false);
   }
 
-  HoodieDeleteBlock(Option<byte[]> content, Supplier<FSDataInputStream> 
inputStreamSupplier, boolean readBlockLazily,
+  HoodieDeleteBlock(Option<byte[]> content, Supplier<SeekableDataInputStream> 
inputStreamSupplier, boolean readBlockLazily,
                     Option<HoodieLogBlockContentLocation> 
blockContentLocation, Map<HeaderMetadataType, String> header,
                     Map<HeaderMetadataType, String> footer, boolean 
shouldWriteRecordPositions) {
     super(header, footer, blockContentLocation, content, inputStreamSupplier, 
readBlockLazily);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 86cfe91f0f7..fa447063aa4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -33,6 +33,7 @@ import 
org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -42,7 +43,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -83,7 +83,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
   private final Path pathForReader;
   private final HoodieConfig hFileReaderConfig;
 
-  public HoodieHFileDataBlock(Supplier<FSDataInputStream> inputStreamSupplier,
+  public HoodieHFileDataBlock(Supplier<SeekableDataInputStream> 
inputStreamSupplier,
                               Option<byte[]> content,
                               boolean readBlockLazily,
                               HoodieLogBlockContentLocation 
logBlockContentLocation,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 2821c132d6b..b9c1063083e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -25,9 +25,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.TypeUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.SeekableDataInputStream;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +36,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -68,10 +67,7 @@ public abstract class HoodieLogBlock {
   private final Option<HoodieLogBlockContentLocation> blockContentLocation;
   // data for a specific block
   private Option<byte[]> content;
-  // TODO : change this to just InputStream so this works for any FileSystem
-  // create handlers to return specific type of inputstream based on FS
-  // input stream corresponding to the log file where this logBlock belongs
-  private final Supplier<FSDataInputStream> inputStreamSupplier;
+  private final Supplier<SeekableDataInputStream> inputStreamSupplier;
   // Toggle flag, whether to read blocks lazily (I/O intensive) or not (Memory 
intensive)
   protected boolean readBlockLazily;
 
@@ -80,7 +76,7 @@ public abstract class HoodieLogBlock {
       @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
       @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation,
       @Nonnull Option<byte[]> content,
-      @Nullable Supplier<FSDataInputStream> inputStreamSupplier,
+      @Nullable Supplier<SeekableDataInputStream> inputStreamSupplier,
       boolean readBlockLazily) {
     this.logBlockHeader = logBlockHeader;
     this.logBlockFooter = logBlockFooter;
@@ -265,7 +261,7 @@ public abstract class HoodieLogBlock {
   /**
    * Convert bytes to LogMetadata, follow the same order as {@link 
HoodieLogBlock#getLogMetadataBytes}.
    */
-  public static Map<HeaderMetadataType, String> getLogMetadata(DataInputStream 
dis) throws IOException {
+  public static Map<HeaderMetadataType, String> 
getLogMetadata(SeekableDataInputStream dis) throws IOException {
 
     Map<HeaderMetadataType, String> metadata = new HashMap<>();
     // 1. Read the metadata written out
@@ -289,7 +285,7 @@ public abstract class HoodieLogBlock {
    * Read or Skip block content of a log block in the log file. Depends on 
lazy reading enabled in
    * {@link HoodieMergedLogRecordScanner}
    */
-  public static Option<byte[]> tryReadContent(FSDataInputStream inputStream, 
Integer contentLength, boolean readLazily)
+  public static Option<byte[]> tryReadContent(SeekableDataInputStream 
inputStream, Integer contentLength, boolean readLazily)
       throws IOException {
     if (readLazily) {
       // Seek to the end of the content block
@@ -311,7 +307,7 @@ public abstract class HoodieLogBlock {
     checkState(!content.isPresent(), "Block has already been inflated");
     checkState(inputStreamSupplier != null, "Block should have input-stream 
provided");
 
-    try (FSDataInputStream inputStream = inputStreamSupplier.get()) {
+    try (SeekableDataInputStream inputStream = inputStreamSupplier.get()) {
       content = Option.of(new byte[(int) 
this.getBlockContentLocation().get().getBlockSize()]);
       
inputStream.seek(this.getBlockContentLocation().get().getContentPositionInLogFile());
       inputStream.readFully(content.get(), 0, content.get().length);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 1b8880c4a0d..485d70890b4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -27,13 +27,13 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetWriter;
@@ -64,7 +64,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock {
   private final Option<Double> expectedCompressionRatio;
   private final Option<Boolean> useDictionaryEncoding;
 
-  public HoodieParquetDataBlock(Supplier<FSDataInputStream> 
inputStreamSupplier,
+  public HoodieParquetDataBlock(Supplier<SeekableDataInputStream> 
inputStreamSupplier,
                                 Option<byte[]> content,
                                 boolean readBlockLazily,
                                 HoodieLogBlockContentLocation 
logBlockContentLocation,
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopSeekableDataInputStream.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopSeekableDataInputStream.java
new file mode 100644
index 00000000000..ae10ca0ac3f
--- /dev/null
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopSeekableDataInputStream.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.fs;
+
+import org.apache.hudi.io.SeekableDataInputStream;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import java.io.IOException;
+
+/**
+ * An implementation of {@link SeekableDataInputStream} based on Hadoop's 
{@link FSDataInputStream}
+ */
+public class HadoopSeekableDataInputStream extends SeekableDataInputStream {
+  private final FSDataInputStream stream;
+
+  public HadoopSeekableDataInputStream(FSDataInputStream stream) {
+    super(stream);
+    this.stream = stream;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return stream.getPos();
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    stream.seek(pos);
+  }
+}
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/SeekableDataInputStream.java 
b/hudi-io/src/main/java/org/apache/hudi/io/SeekableDataInputStream.java
new file mode 100644
index 00000000000..c76fd3be32d
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/io/SeekableDataInputStream.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link InputStream} that supports random access by allowing to seek to
+ * an arbitrary position within the stream and read the content.
+ */
+public abstract class SeekableDataInputStream extends DataInputStream {
+  /**
+   * Creates a DataInputStream that uses the specified
+   * underlying InputStream.
+   *
+   * @param in the specified input stream
+   */
+  public SeekableDataInputStream(InputStream in) {
+    super(in);
+  }
+
+  /**
+   * @return current position of the stream. The next read() will be from that 
location.
+   */
+  public abstract long getPos() throws IOException;
+
+  /**
+   * Seeks to a position within the stream.
+   *
+   * @param pos target position to seek to.
+   * @throws IOException upon error.
+   */
+  public abstract void seek(long pos) throws IOException;
+}

Reply via email to