This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 65ce1e62b1 [format] read blobs as bytes when blob-as-descriptor is
false (#6989)
65ce1e62b1 is described below
commit 65ce1e62b1a0f13adf03da927bf45b0b18e9faaa
Author: Faiz <[email protected]>
AuthorDate: Fri Jan 9 14:20:01 2026 +0800
[format] read blobs as bytes when blob-as-descriptor is false (#6989)
---
.../apache/paimon/format/blob/BlobFileFormat.java | 21 +++-
.../paimon/format/blob/BlobFileFormatFactory.java | 4 +-
.../paimon/format/blob/BlobFormatReader.java | 118 ++++++++++++---------
.../paimon/format/blob/BlobFileFormatTest.java | 26 ++++-
4 files changed, 115 insertions(+), 54 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
index 29f5df13df..dcf2e90fc9 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
@@ -43,8 +43,15 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** {@link FileFormat} for blob file. */
public class BlobFileFormat extends FileFormat {
+ private final boolean blobAsDescriptor;
+
public BlobFileFormat() {
+ this(false);
+ }
+
+ public BlobFileFormat(boolean blobAsDescriptor) {
super(BlobFileFormatFactory.IDENTIFIER);
+ this.blobAsDescriptor = blobAsDescriptor;
}
public static boolean isBlobFile(String fileName) {
@@ -56,7 +63,7 @@ public class BlobFileFormat extends FileFormat {
RowType dataSchemaRowType,
RowType projectedRowType,
@Nullable List<Predicate> filters) {
- return new BlobFormatReaderFactory();
+ return new BlobFormatReaderFactory(blobAsDescriptor);
}
@Override
@@ -89,10 +96,20 @@ public class BlobFileFormat extends FileFormat {
private static class BlobFormatReaderFactory implements
FormatReaderFactory {
+ private final boolean blobAsDescriptor;
+
+ public BlobFormatReaderFactory(boolean blobAsDescriptor) {
+ this.blobAsDescriptor = blobAsDescriptor;
+ }
+
@Override
public FileRecordReader<InternalRow> createReader(Context context)
throws IOException {
return new BlobFormatReader(
- context.fileIO(), context.filePath(), context.fileSize(),
context.selection());
+ context.fileIO(),
+ context.filePath(),
+ context.fileSize(),
+ context.selection(),
+ blobAsDescriptor);
}
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
index 84b0d9431e..2a54d49709 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
@@ -18,6 +18,7 @@
package org.apache.paimon.format.blob;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory;
@@ -33,6 +34,7 @@ public class BlobFileFormatFactory implements
FileFormatFactory {
@Override
public FileFormat create(FormatContext formatContext) {
- return new BlobFileFormat();
+ boolean blobAsDescriptor =
formatContext.options().get(CoreOptions.BLOB_AS_DESCRIPTOR);
+ return new BlobFileFormat(blobAsDescriptor);
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index eb860cbb17..f73c5431be 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -44,58 +44,65 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
private final long[] blobLengths;
private final long[] blobOffsets;
private final int[] returnedPositions;
+ private final SeekableInputStream in;
+ private final boolean blobAsDescriptor;
private boolean returned;
public BlobFormatReader(
- FileIO fileIO, Path filePath, long fileSize, @Nullable
RoaringBitmap32 selection)
+ FileIO fileIO,
+ Path filePath,
+ long fileSize,
+ @Nullable RoaringBitmap32 selection,
+ boolean blobAsDescriptor)
throws IOException {
this.fileIO = fileIO;
this.filePath = filePath;
this.returned = false;
- try (SeekableInputStream in = fileIO.newInputStream(filePath)) {
- in.seek(fileSize - 5);
- byte[] header = new byte[5];
- IOUtils.readFully(in, header);
- byte version = header[4];
- if (version != 1) {
- throw new IOException("Unsupported version: " + version);
- }
- int indexLength = BytesUtils.getInt(header, 0);
-
- in.seek(fileSize - 5 - indexLength);
- byte[] indexBytes = new byte[indexLength];
- IOUtils.readFully(in, indexBytes);
-
- long[] blobLengths = DeltaVarintCompressor.decompress(indexBytes);
- long[] blobOffsets = new long[blobLengths.length];
- long offset = 0;
- for (int i = 0; i < blobLengths.length; i++) {
- blobOffsets[i] = offset;
- offset += blobLengths[i];
- }
+ this.blobAsDescriptor = blobAsDescriptor;
+ this.in = fileIO.newInputStream(filePath);
+
+ in.seek(fileSize - 5);
+ byte[] header = new byte[5];
+ IOUtils.readFully(in, header);
+ byte version = header[4];
+ if (version != 1) {
+ throw new IOException("Unsupported version: " + version);
+ }
+ int indexLength = BytesUtils.getInt(header, 0);
+
+ in.seek(fileSize - 5 - indexLength);
+ byte[] indexBytes = new byte[indexLength];
+ IOUtils.readFully(in, indexBytes);
+
+ long[] blobLengths = DeltaVarintCompressor.decompress(indexBytes);
+ long[] blobOffsets = new long[blobLengths.length];
+ long offset = 0;
+ for (int i = 0; i < blobLengths.length; i++) {
+ blobOffsets[i] = offset;
+ offset += blobLengths[i];
+ }
- int[] returnedPositions = null;
- if (selection != null) {
- int cardinality = (int) selection.getCardinality();
- returnedPositions = new int[cardinality];
- long[] newLengths = new long[cardinality];
- long[] newOffsets = new long[cardinality];
- Iterator<Integer> iterator = selection.iterator();
- for (int i = 0; i < cardinality; i++) {
- Integer next = iterator.next();
- newLengths[i] = blobLengths[next];
- newOffsets[i] = blobOffsets[next];
- returnedPositions[i] = next;
- }
- blobLengths = newLengths;
- blobOffsets = newOffsets;
+ int[] returnedPositions = null;
+ if (selection != null) {
+ int cardinality = (int) selection.getCardinality();
+ returnedPositions = new int[cardinality];
+ long[] newLengths = new long[cardinality];
+ long[] newOffsets = new long[cardinality];
+ Iterator<Integer> iterator = selection.iterator();
+ for (int i = 0; i < cardinality; i++) {
+ Integer next = iterator.next();
+ newLengths[i] = blobLengths[next];
+ newOffsets[i] = blobOffsets[next];
+ returnedPositions[i] = next;
}
-
- this.returnedPositions = returnedPositions;
- this.blobLengths = blobLengths;
- this.blobOffsets = blobOffsets;
+ blobLengths = newLengths;
+ blobOffsets = newOffsets;
}
+
+ this.returnedPositions = returnedPositions;
+ this.blobLengths = blobLengths;
+ this.blobOffsets = blobOffsets;
}
@Nullable
@@ -129,12 +136,14 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
return null;
}
- Blob blob =
- Blob.fromFile(
- fileIO,
- filePath.toString(),
- blobOffsets[currentPosition] + 4,
- blobLengths[currentPosition] - 16);
+ Blob blob;
+ long offset = blobOffsets[currentPosition] + 4;
+ long length = blobLengths[currentPosition] - 16;
+ if (!blobAsDescriptor) {
+ blob = Blob.fromData(readInlineBlob(offset, length));
+ } else {
+ blob = Blob.fromFile(fileIO, filePath.toString(), offset,
length);
+ }
currentPosition++;
return GenericRow.of(blob);
}
@@ -145,5 +154,18 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
}
@Override
- public void close() throws IOException {}
+ public void close() throws IOException {
+ in.close();
+ }
+
+ private byte[] readInlineBlob(long position, long length) {
+ byte[] blobData = new byte[(int) length];
+ try {
+ in.seek(position);
+ IOUtils.readFully(in, blobData);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return blobData;
+ }
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
index 005a78c24c..feaaacc5d1 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
@@ -18,7 +18,9 @@
package org.apache.paimon.format.blob;
+import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobRef;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
@@ -61,8 +63,17 @@ public class BlobFileFormatTest {
}
@Test
- public void test() throws IOException {
- BlobFileFormat format = new BlobFileFormat();
+ public void testBlobAsDescriptor() throws IOException {
+ innerTest(true);
+ }
+
+ @Test
+ public void testReadBlobInlineBytes() throws IOException {
+ innerTest(false);
+ }
+
+ private void innerTest(boolean blobAsDescriptor) throws IOException {
+ BlobFileFormat format = new BlobFileFormat(blobAsDescriptor);
RowType rowType = RowType.of(DataTypes.BLOB());
// write
@@ -83,7 +94,16 @@ public class BlobFileFormatTest {
List<byte[]> result = new ArrayList<>();
readerFactory
.createReader(context)
- .forEachRemaining(row -> result.add(row.getBlob(0).toData()));
+ .forEachRemaining(
+ row -> {
+ Blob blob = row.getBlob(0);
+ if (blobAsDescriptor) {
+ assertThat(blob).isInstanceOf(BlobRef.class);
+ } else {
+ assertThat(blob).isInstanceOf(BlobData.class);
+ }
+ result.add(blob.toData());
+ });
// assert
assertThat(result).containsExactlyElementsOf(blobs);