This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f6925976de [blob] Extract BlobFileMeta from BlobFormatReader
f6925976de is described below

commit f6925976ded2d0c1dee7c5bd9d5af54ffa418a39
Author: JingsongLi <[email protected]>
AuthorDate: Fri Jan 9 14:46:06 2026 +0800

    [blob] Extract BlobFileMeta from BlobFormatReader
---
 .../apache/paimon/format/blob/BlobFileFormat.java  | 24 ++++--
 .../{BlobFormatReader.java => BlobFileMeta.java}   | 98 +++-------------------
 .../paimon/format/blob/BlobFormatReader.java       | 82 +++---------------
 3 files changed, 44 insertions(+), 160 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
index dcf2e90fc9..78f06b486b 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
@@ -25,12 +25,16 @@ import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriter;
 import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.SimpleStatsExtractor;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
 import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IOUtils;
 
 import javax.annotation.Nullable;
 
@@ -104,12 +108,20 @@ public class BlobFileFormat extends FileFormat {
 
         @Override
         public FileRecordReader<InternalRow> createReader(Context context) 
throws IOException {
-            return new BlobFormatReader(
-                    context.fileIO(),
-                    context.filePath(),
-                    context.fileSize(),
-                    context.selection(),
-                    blobAsDescriptor);
+            FileIO fileIO = context.fileIO();
+            Path filePath = context.filePath();
+            SeekableInputStream in = null;
+            BlobFileMeta fileMeta;
+            try {
+                in = fileIO.newInputStream(filePath);
+                fileMeta = new BlobFileMeta(in, context.fileSize(), 
context.selection());
+            } finally {
+                if (blobAsDescriptor) {
+                    IOUtils.closeQuietly(in);
+                    in = null;
+                }
+            }
+            return new BlobFormatReader(fileIO, filePath, fileMeta, in);
         }
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
 b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
similarity index 50%
copy from 
paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
copy to 
paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
index f73c5431be..6fde55d485 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
@@ -18,15 +18,8 @@
 
 package org.apache.paimon.format.blob;
 
-import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.memory.BytesUtils;
-import org.apache.paimon.reader.FileRecordIterator;
-import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.utils.DeltaVarintCompressor;
 import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.RoaringBitmap32;
@@ -36,32 +29,15 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Iterator;
 
-/** {@link FileRecordReader} for blob file. */
-public class BlobFormatReader implements FileRecordReader<InternalRow> {
+/** Metadata of blob file. */
+public class BlobFileMeta {
 
-    private final FileIO fileIO;
-    private final Path filePath;
     private final long[] blobLengths;
     private final long[] blobOffsets;
-    private final int[] returnedPositions;
-    private final SeekableInputStream in;
-    private final boolean blobAsDescriptor;
+    private final @Nullable int[] returnedPositions;
 
-    private boolean returned;
-
-    public BlobFormatReader(
-            FileIO fileIO,
-            Path filePath,
-            long fileSize,
-            @Nullable RoaringBitmap32 selection,
-            boolean blobAsDescriptor)
+    public BlobFileMeta(SeekableInputStream in, long fileSize, @Nullable 
RoaringBitmap32 selection)
             throws IOException {
-        this.fileIO = fileIO;
-        this.filePath = filePath;
-        this.returned = false;
-        this.blobAsDescriptor = blobAsDescriptor;
-        this.in = fileIO.newInputStream(filePath);
-
         in.seek(fileSize - 5);
         byte[] header = new byte[5];
         IOUtils.readFully(in, header);
@@ -105,67 +81,19 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
         this.blobOffsets = blobOffsets;
     }
 
-    @Nullable
-    @Override
-    public FileRecordIterator<InternalRow> readBatch() throws IOException {
-        if (returned) {
-            return null;
-        }
-
-        returned = true;
-        return new FileRecordIterator<InternalRow>() {
-
-            int currentPosition = 0;
-
-            @Override
-            public long returnedPosition() {
-                return returnedPositions == null
-                        ? currentPosition
-                        : returnedPositions[currentPosition - 1];
-            }
-
-            @Override
-            public Path filePath() {
-                return filePath;
-            }
-
-            @Nullable
-            @Override
-            public InternalRow next() {
-                if (currentPosition >= blobLengths.length) {
-                    return null;
-                }
-
-                Blob blob;
-                long offset = blobOffsets[currentPosition] + 4;
-                long length = blobLengths[currentPosition] - 16;
-                if (!blobAsDescriptor) {
-                    blob = Blob.fromData(readInlineBlob(offset, length));
-                } else {
-                    blob = Blob.fromFile(fileIO, filePath.toString(), offset, 
length);
-                }
-                currentPosition++;
-                return GenericRow.of(blob);
-            }
+    public long blobLength(int i) {
+        return blobLengths[i];
+    }
 
-            @Override
-            public void releaseBatch() {}
-        };
+    public long blobOffset(int i) {
+        return blobOffsets[i];
     }
 
-    @Override
-    public void close() throws IOException {
-        in.close();
+    public int returnedPosition(int i) {
+        return returnedPositions == null ? i : returnedPositions[i - 1];
     }
 
-    private byte[] readInlineBlob(long position, long length) {
-        byte[] blobData = new byte[(int) length];
-        try {
-            in.seek(position);
-            IOUtils.readFully(in, blobData);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        return blobData;
+    public int recordNumber() {
+        return blobLengths.length;
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index f73c5431be..bed455fda7 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -24,85 +24,31 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.memory.BytesUtils;
 import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.FileRecordReader;
-import org.apache.paimon.utils.DeltaVarintCompressor;
 import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.RoaringBitmap32;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 /** {@link FileRecordReader} for blob file. */
 public class BlobFormatReader implements FileRecordReader<InternalRow> {
 
     private final FileIO fileIO;
     private final Path filePath;
-    private final long[] blobLengths;
-    private final long[] blobOffsets;
-    private final int[] returnedPositions;
-    private final SeekableInputStream in;
-    private final boolean blobAsDescriptor;
+    private final BlobFileMeta fileMeta;
+    private final @Nullable SeekableInputStream in;
 
     private boolean returned;
 
     public BlobFormatReader(
-            FileIO fileIO,
-            Path filePath,
-            long fileSize,
-            @Nullable RoaringBitmap32 selection,
-            boolean blobAsDescriptor)
-            throws IOException {
+            FileIO fileIO, Path filePath, BlobFileMeta fileMeta, @Nullable 
SeekableInputStream in) {
         this.fileIO = fileIO;
         this.filePath = filePath;
+        this.fileMeta = fileMeta;
+        this.in = in;
         this.returned = false;
-        this.blobAsDescriptor = blobAsDescriptor;
-        this.in = fileIO.newInputStream(filePath);
-
-        in.seek(fileSize - 5);
-        byte[] header = new byte[5];
-        IOUtils.readFully(in, header);
-        byte version = header[4];
-        if (version != 1) {
-            throw new IOException("Unsupported version: " + version);
-        }
-        int indexLength = BytesUtils.getInt(header, 0);
-
-        in.seek(fileSize - 5 - indexLength);
-        byte[] indexBytes = new byte[indexLength];
-        IOUtils.readFully(in, indexBytes);
-
-        long[] blobLengths = DeltaVarintCompressor.decompress(indexBytes);
-        long[] blobOffsets = new long[blobLengths.length];
-        long offset = 0;
-        for (int i = 0; i < blobLengths.length; i++) {
-            blobOffsets[i] = offset;
-            offset += blobLengths[i];
-        }
-
-        int[] returnedPositions = null;
-        if (selection != null) {
-            int cardinality = (int) selection.getCardinality();
-            returnedPositions = new int[cardinality];
-            long[] newLengths = new long[cardinality];
-            long[] newOffsets = new long[cardinality];
-            Iterator<Integer> iterator = selection.iterator();
-            for (int i = 0; i < cardinality; i++) {
-                Integer next = iterator.next();
-                newLengths[i] = blobLengths[next];
-                newOffsets[i] = blobOffsets[next];
-                returnedPositions[i] = next;
-            }
-            blobLengths = newLengths;
-            blobOffsets = newOffsets;
-        }
-
-        this.returnedPositions = returnedPositions;
-        this.blobLengths = blobLengths;
-        this.blobOffsets = blobOffsets;
     }
 
     @Nullable
@@ -119,9 +65,7 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
 
             @Override
             public long returnedPosition() {
-                return returnedPositions == null
-                        ? currentPosition
-                        : returnedPositions[currentPosition - 1];
+                return fileMeta.returnedPosition(currentPosition);
             }
 
             @Override
@@ -132,15 +76,15 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
             @Nullable
             @Override
             public InternalRow next() {
-                if (currentPosition >= blobLengths.length) {
+                if (currentPosition >= fileMeta.recordNumber()) {
                     return null;
                 }
 
                 Blob blob;
-                long offset = blobOffsets[currentPosition] + 4;
-                long length = blobLengths[currentPosition] - 16;
-                if (!blobAsDescriptor) {
-                    blob = Blob.fromData(readInlineBlob(offset, length));
+                long offset = fileMeta.blobOffset(currentPosition) + 4;
+                long length = fileMeta.blobLength(currentPosition) - 16;
+                if (in != null) {
+                    blob = Blob.fromData(readInlineBlob(in, offset, length));
                 } else {
                     blob = Blob.fromFile(fileIO, filePath.toString(), offset, 
length);
                 }
@@ -155,10 +99,10 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
 
     @Override
     public void close() throws IOException {
-        in.close();
+        IOUtils.closeQuietly(in);
     }
 
-    private byte[] readInlineBlob(long position, long length) {
+    private static byte[] readInlineBlob(SeekableInputStream in, long 
position, long length) {
         byte[] blobData = new byte[(int) length];
         try {
             in.seek(position);

Reply via email to