This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f6925976de [blob] Extract BlobFileMeta from BlobFormatReader
f6925976de is described below
commit f6925976ded2d0c1dee7c5bd9d5af54ffa418a39
Author: JingsongLi <[email protected]>
AuthorDate: Fri Jan 9 14:46:06 2026 +0800
[blob] Extract BlobFileMeta from BlobFormatReader
---
.../apache/paimon/format/blob/BlobFileFormat.java | 24 ++++--
.../{BlobFormatReader.java => BlobFileMeta.java} | 98 +++-------------------
.../paimon/format/blob/BlobFormatReader.java | 82 +++---------------
3 files changed, 44 insertions(+), 160 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
index dcf2e90fc9..78f06b486b 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
@@ -25,12 +25,16 @@ import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleStatsExtractor;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IOUtils;
import javax.annotation.Nullable;
@@ -104,12 +108,20 @@ public class BlobFileFormat extends FileFormat {
@Override
public FileRecordReader<InternalRow> createReader(Context context)
throws IOException {
- return new BlobFormatReader(
- context.fileIO(),
- context.filePath(),
- context.fileSize(),
- context.selection(),
- blobAsDescriptor);
+ FileIO fileIO = context.fileIO();
+ Path filePath = context.filePath();
+ SeekableInputStream in = null;
+ BlobFileMeta fileMeta;
+ try {
+ in = fileIO.newInputStream(filePath);
+ fileMeta = new BlobFileMeta(in, context.fileSize(),
context.selection());
+ } finally {
+ if (blobAsDescriptor) {
+ IOUtils.closeQuietly(in);
+ in = null;
+ }
+ }
+ return new BlobFormatReader(fileIO, filePath, fileMeta, in);
}
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
similarity index 50%
copy from
paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
copy to
paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
index f73c5431be..6fde55d485 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
@@ -18,15 +18,8 @@
package org.apache.paimon.format.blob;
-import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.memory.BytesUtils;
-import org.apache.paimon.reader.FileRecordIterator;
-import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.utils.DeltaVarintCompressor;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.RoaringBitmap32;
@@ -36,32 +29,15 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Iterator;
-/** {@link FileRecordReader} for blob file. */
-public class BlobFormatReader implements FileRecordReader<InternalRow> {
+/** Metadata of blob file. */
+public class BlobFileMeta {
- private final FileIO fileIO;
- private final Path filePath;
private final long[] blobLengths;
private final long[] blobOffsets;
- private final int[] returnedPositions;
- private final SeekableInputStream in;
- private final boolean blobAsDescriptor;
+ private final @Nullable int[] returnedPositions;
- private boolean returned;
-
- public BlobFormatReader(
- FileIO fileIO,
- Path filePath,
- long fileSize,
- @Nullable RoaringBitmap32 selection,
- boolean blobAsDescriptor)
+ public BlobFileMeta(SeekableInputStream in, long fileSize, @Nullable
RoaringBitmap32 selection)
throws IOException {
- this.fileIO = fileIO;
- this.filePath = filePath;
- this.returned = false;
- this.blobAsDescriptor = blobAsDescriptor;
- this.in = fileIO.newInputStream(filePath);
-
in.seek(fileSize - 5);
byte[] header = new byte[5];
IOUtils.readFully(in, header);
@@ -105,67 +81,19 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
this.blobOffsets = blobOffsets;
}
- @Nullable
- @Override
- public FileRecordIterator<InternalRow> readBatch() throws IOException {
- if (returned) {
- return null;
- }
-
- returned = true;
- return new FileRecordIterator<InternalRow>() {
-
- int currentPosition = 0;
-
- @Override
- public long returnedPosition() {
- return returnedPositions == null
- ? currentPosition
- : returnedPositions[currentPosition - 1];
- }
-
- @Override
- public Path filePath() {
- return filePath;
- }
-
- @Nullable
- @Override
- public InternalRow next() {
- if (currentPosition >= blobLengths.length) {
- return null;
- }
-
- Blob blob;
- long offset = blobOffsets[currentPosition] + 4;
- long length = blobLengths[currentPosition] - 16;
- if (!blobAsDescriptor) {
- blob = Blob.fromData(readInlineBlob(offset, length));
- } else {
- blob = Blob.fromFile(fileIO, filePath.toString(), offset,
length);
- }
- currentPosition++;
- return GenericRow.of(blob);
- }
+ public long blobLength(int i) {
+ return blobLengths[i];
+ }
- @Override
- public void releaseBatch() {}
- };
+ public long blobOffset(int i) {
+ return blobOffsets[i];
}
- @Override
- public void close() throws IOException {
- in.close();
+ public int returnedPosition(int i) {
+ return returnedPositions == null ? i : returnedPositions[i - 1];
}
- private byte[] readInlineBlob(long position, long length) {
- byte[] blobData = new byte[(int) length];
- try {
- in.seek(position);
- IOUtils.readFully(in, blobData);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return blobData;
+ public int recordNumber() {
+ return blobLengths.length;
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index f73c5431be..bed455fda7 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -24,85 +24,31 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.memory.BytesUtils;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.FileRecordReader;
-import org.apache.paimon.utils.DeltaVarintCompressor;
import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.RoaringBitmap32;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Iterator;
/** {@link FileRecordReader} for blob file. */
public class BlobFormatReader implements FileRecordReader<InternalRow> {
private final FileIO fileIO;
private final Path filePath;
- private final long[] blobLengths;
- private final long[] blobOffsets;
- private final int[] returnedPositions;
- private final SeekableInputStream in;
- private final boolean blobAsDescriptor;
+ private final BlobFileMeta fileMeta;
+ private final @Nullable SeekableInputStream in;
private boolean returned;
public BlobFormatReader(
- FileIO fileIO,
- Path filePath,
- long fileSize,
- @Nullable RoaringBitmap32 selection,
- boolean blobAsDescriptor)
- throws IOException {
+ FileIO fileIO, Path filePath, BlobFileMeta fileMeta, @Nullable
SeekableInputStream in) {
this.fileIO = fileIO;
this.filePath = filePath;
+ this.fileMeta = fileMeta;
+ this.in = in;
this.returned = false;
- this.blobAsDescriptor = blobAsDescriptor;
- this.in = fileIO.newInputStream(filePath);
-
- in.seek(fileSize - 5);
- byte[] header = new byte[5];
- IOUtils.readFully(in, header);
- byte version = header[4];
- if (version != 1) {
- throw new IOException("Unsupported version: " + version);
- }
- int indexLength = BytesUtils.getInt(header, 0);
-
- in.seek(fileSize - 5 - indexLength);
- byte[] indexBytes = new byte[indexLength];
- IOUtils.readFully(in, indexBytes);
-
- long[] blobLengths = DeltaVarintCompressor.decompress(indexBytes);
- long[] blobOffsets = new long[blobLengths.length];
- long offset = 0;
- for (int i = 0; i < blobLengths.length; i++) {
- blobOffsets[i] = offset;
- offset += blobLengths[i];
- }
-
- int[] returnedPositions = null;
- if (selection != null) {
- int cardinality = (int) selection.getCardinality();
- returnedPositions = new int[cardinality];
- long[] newLengths = new long[cardinality];
- long[] newOffsets = new long[cardinality];
- Iterator<Integer> iterator = selection.iterator();
- for (int i = 0; i < cardinality; i++) {
- Integer next = iterator.next();
- newLengths[i] = blobLengths[next];
- newOffsets[i] = blobOffsets[next];
- returnedPositions[i] = next;
- }
- blobLengths = newLengths;
- blobOffsets = newOffsets;
- }
-
- this.returnedPositions = returnedPositions;
- this.blobLengths = blobLengths;
- this.blobOffsets = blobOffsets;
}
@Nullable
@@ -119,9 +65,7 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
@Override
public long returnedPosition() {
- return returnedPositions == null
- ? currentPosition
- : returnedPositions[currentPosition - 1];
+ return fileMeta.returnedPosition(currentPosition);
}
@Override
@@ -132,15 +76,15 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
@Nullable
@Override
public InternalRow next() {
- if (currentPosition >= blobLengths.length) {
+ if (currentPosition >= fileMeta.recordNumber()) {
return null;
}
Blob blob;
- long offset = blobOffsets[currentPosition] + 4;
- long length = blobLengths[currentPosition] - 16;
- if (!blobAsDescriptor) {
- blob = Blob.fromData(readInlineBlob(offset, length));
+ long offset = fileMeta.blobOffset(currentPosition) + 4;
+ long length = fileMeta.blobLength(currentPosition) - 16;
+ if (in != null) {
+ blob = Blob.fromData(readInlineBlob(in, offset, length));
} else {
blob = Blob.fromFile(fileIO, filePath.toString(), offset,
length);
}
@@ -155,10 +99,10 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
@Override
public void close() throws IOException {
- in.close();
+ IOUtils.closeQuietly(in);
}
- private byte[] readInlineBlob(long position, long length) {
+ private static byte[] readInlineBlob(SeekableInputStream in, long
position, long length) {
byte[] blobData = new byte[(int) length];
try {
in.seek(position);