This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 65ce1e62b1 [format] read blobs as bytes when blob-as-descriptor is 
false (#6989)
65ce1e62b1 is described below

commit 65ce1e62b1a0f13adf03da927bf45b0b18e9faaa
Author: Faiz <[email protected]>
AuthorDate: Fri Jan 9 14:20:01 2026 +0800

    [format] read blobs as bytes when blob-as-descriptor is false (#6989)
---
 .../apache/paimon/format/blob/BlobFileFormat.java  |  21 +++-
 .../paimon/format/blob/BlobFileFormatFactory.java  |   4 +-
 .../paimon/format/blob/BlobFormatReader.java       | 118 ++++++++++++---------
 .../paimon/format/blob/BlobFileFormatTest.java     |  26 ++++-
 4 files changed, 115 insertions(+), 54 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
index 29f5df13df..dcf2e90fc9 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
@@ -43,8 +43,15 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 /** {@link FileFormat} for blob file. */
 public class BlobFileFormat extends FileFormat {
 
+    private final boolean blobAsDescriptor;
+
     public BlobFileFormat() {
+        this(false);
+    }
+
+    public BlobFileFormat(boolean blobAsDescriptor) {
         super(BlobFileFormatFactory.IDENTIFIER);
+        this.blobAsDescriptor = blobAsDescriptor;
     }
 
     public static boolean isBlobFile(String fileName) {
@@ -56,7 +63,7 @@ public class BlobFileFormat extends FileFormat {
             RowType dataSchemaRowType,
             RowType projectedRowType,
             @Nullable List<Predicate> filters) {
-        return new BlobFormatReaderFactory();
+        return new BlobFormatReaderFactory(blobAsDescriptor);
     }
 
     @Override
@@ -89,10 +96,20 @@ public class BlobFileFormat extends FileFormat {
 
     private static class BlobFormatReaderFactory implements 
FormatReaderFactory {
 
+        private final boolean blobAsDescriptor;
+
+        public BlobFormatReaderFactory(boolean blobAsDescriptor) {
+            this.blobAsDescriptor = blobAsDescriptor;
+        }
+
         @Override
         public FileRecordReader<InternalRow> createReader(Context context) 
throws IOException {
             return new BlobFormatReader(
-                    context.fileIO(), context.filePath(), context.fileSize(), 
context.selection());
+                    context.fileIO(),
+                    context.filePath(),
+                    context.fileSize(),
+                    context.selection(),
+                    blobAsDescriptor);
         }
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
index 84b0d9431e..2a54d49709 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.format.blob;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FileFormatFactory;
 
@@ -33,6 +34,7 @@ public class BlobFileFormatFactory implements 
FileFormatFactory {
 
     @Override
     public FileFormat create(FormatContext formatContext) {
-        return new BlobFileFormat();
+        boolean blobAsDescriptor = 
formatContext.options().get(CoreOptions.BLOB_AS_DESCRIPTOR);
+        return new BlobFileFormat(blobAsDescriptor);
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index eb860cbb17..f73c5431be 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -44,58 +44,65 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
     private final long[] blobLengths;
     private final long[] blobOffsets;
     private final int[] returnedPositions;
+    private final SeekableInputStream in;
+    private final boolean blobAsDescriptor;
 
     private boolean returned;
 
     public BlobFormatReader(
-            FileIO fileIO, Path filePath, long fileSize, @Nullable 
RoaringBitmap32 selection)
+            FileIO fileIO,
+            Path filePath,
+            long fileSize,
+            @Nullable RoaringBitmap32 selection,
+            boolean blobAsDescriptor)
             throws IOException {
         this.fileIO = fileIO;
         this.filePath = filePath;
         this.returned = false;
-        try (SeekableInputStream in = fileIO.newInputStream(filePath)) {
-            in.seek(fileSize - 5);
-            byte[] header = new byte[5];
-            IOUtils.readFully(in, header);
-            byte version = header[4];
-            if (version != 1) {
-                throw new IOException("Unsupported version: " + version);
-            }
-            int indexLength = BytesUtils.getInt(header, 0);
-
-            in.seek(fileSize - 5 - indexLength);
-            byte[] indexBytes = new byte[indexLength];
-            IOUtils.readFully(in, indexBytes);
-
-            long[] blobLengths = DeltaVarintCompressor.decompress(indexBytes);
-            long[] blobOffsets = new long[blobLengths.length];
-            long offset = 0;
-            for (int i = 0; i < blobLengths.length; i++) {
-                blobOffsets[i] = offset;
-                offset += blobLengths[i];
-            }
+        this.blobAsDescriptor = blobAsDescriptor;
+        this.in = fileIO.newInputStream(filePath);
+
+        in.seek(fileSize - 5);
+        byte[] header = new byte[5];
+        IOUtils.readFully(in, header);
+        byte version = header[4];
+        if (version != 1) {
+            throw new IOException("Unsupported version: " + version);
+        }
+        int indexLength = BytesUtils.getInt(header, 0);
+
+        in.seek(fileSize - 5 - indexLength);
+        byte[] indexBytes = new byte[indexLength];
+        IOUtils.readFully(in, indexBytes);
+
+        long[] blobLengths = DeltaVarintCompressor.decompress(indexBytes);
+        long[] blobOffsets = new long[blobLengths.length];
+        long offset = 0;
+        for (int i = 0; i < blobLengths.length; i++) {
+            blobOffsets[i] = offset;
+            offset += blobLengths[i];
+        }
 
-            int[] returnedPositions = null;
-            if (selection != null) {
-                int cardinality = (int) selection.getCardinality();
-                returnedPositions = new int[cardinality];
-                long[] newLengths = new long[cardinality];
-                long[] newOffsets = new long[cardinality];
-                Iterator<Integer> iterator = selection.iterator();
-                for (int i = 0; i < cardinality; i++) {
-                    Integer next = iterator.next();
-                    newLengths[i] = blobLengths[next];
-                    newOffsets[i] = blobOffsets[next];
-                    returnedPositions[i] = next;
-                }
-                blobLengths = newLengths;
-                blobOffsets = newOffsets;
+        int[] returnedPositions = null;
+        if (selection != null) {
+            int cardinality = (int) selection.getCardinality();
+            returnedPositions = new int[cardinality];
+            long[] newLengths = new long[cardinality];
+            long[] newOffsets = new long[cardinality];
+            Iterator<Integer> iterator = selection.iterator();
+            for (int i = 0; i < cardinality; i++) {
+                Integer next = iterator.next();
+                newLengths[i] = blobLengths[next];
+                newOffsets[i] = blobOffsets[next];
+                returnedPositions[i] = next;
             }
-
-            this.returnedPositions = returnedPositions;
-            this.blobLengths = blobLengths;
-            this.blobOffsets = blobOffsets;
+            blobLengths = newLengths;
+            blobOffsets = newOffsets;
         }
+
+        this.returnedPositions = returnedPositions;
+        this.blobLengths = blobLengths;
+        this.blobOffsets = blobOffsets;
     }
 
     @Nullable
@@ -129,12 +136,14 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
                     return null;
                 }
 
-                Blob blob =
-                        Blob.fromFile(
-                                fileIO,
-                                filePath.toString(),
-                                blobOffsets[currentPosition] + 4,
-                                blobLengths[currentPosition] - 16);
+                Blob blob;
+                long offset = blobOffsets[currentPosition] + 4;
+                long length = blobLengths[currentPosition] - 16;
+                if (!blobAsDescriptor) {
+                    blob = Blob.fromData(readInlineBlob(offset, length));
+                } else {
+                    blob = Blob.fromFile(fileIO, filePath.toString(), offset, 
length);
+                }
                 currentPosition++;
                 return GenericRow.of(blob);
             }
@@ -145,5 +154,18 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
     }
 
     @Override
-    public void close() throws IOException {}
+    public void close() throws IOException {
+        in.close();
+    }
+
+    private byte[] readInlineBlob(long position, long length) {
+        byte[] blobData = new byte[(int) length];
+        try {
+            in.seek(position);
+            IOUtils.readFully(in, blobData);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return blobData;
+    }
 }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
index 005a78c24c..feaaacc5d1 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
@@ -18,7 +18,9 @@
 
 package org.apache.paimon.format.blob;
 
+import org.apache.paimon.data.Blob;
 import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobRef;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.FormatReaderFactory;
@@ -61,8 +63,17 @@ public class BlobFileFormatTest {
     }
 
     @Test
-    public void test() throws IOException {
-        BlobFileFormat format = new BlobFileFormat();
+    public void testBlobAsDescriptor() throws IOException {
+        innerTest(true);
+    }
+
+    @Test
+    public void testReadBlobInlineBytes() throws IOException {
+        innerTest(false);
+    }
+
+    private void innerTest(boolean blobAsDescriptor) throws IOException {
+        BlobFileFormat format = new BlobFileFormat(blobAsDescriptor);
         RowType rowType = RowType.of(DataTypes.BLOB());
 
         // write
@@ -83,7 +94,16 @@ public class BlobFileFormatTest {
         List<byte[]> result = new ArrayList<>();
         readerFactory
                 .createReader(context)
-                .forEachRemaining(row -> result.add(row.getBlob(0).toData()));
+                .forEachRemaining(
+                        row -> {
+                            Blob blob = row.getBlob(0);
+                            if (blobAsDescriptor) {
+                                assertThat(blob).isInstanceOf(BlobRef.class);
+                            } else {
+                                assertThat(blob).isInstanceOf(BlobData.class);
+                            }
+                            result.add(blob.toData());
+                        });
 
         // assert
         assertThat(result).containsExactlyElementsOf(blobs);

Reply via email to