This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 523244cb1f [format] Introduce BlobFileFormat to blob storage (#6283)
523244cb1f is described below

commit 523244cb1fe94a8f5b4593b7a262574e587702ad
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Sep 19 14:42:15 2025 +0800

    [format] Introduce BlobFileFormat to blob storage (#6283)
---
 .../apache/paimon/format/EmptyStatsExtractor.java  |  40 ++++++
 .../paimon/fs/OffsetSeekableInputStream.java       |  72 ++++++++++
 .../java/org/apache/paimon/utils/StreamUtils.java  |  40 ++++++
 .../paimon/fs/OffsetSeekableInputStreamTest.java   | 149 +++++++++++++++++++++
 .../CompactedChangelogFormatReaderFactory.java     |  46 +------
 .../apache/paimon/format/blob/BlobFileFormat.java  |  94 +++++++++++++
 .../paimon/format/blob/BlobFileFormatFactory.java  |  38 ++++++
 .../org/apache/paimon/format/blob/BlobFileRef.java |  60 +++++++++
 .../paimon/format/blob/BlobFormatReader.java       | 141 +++++++++++++++++++
 .../paimon/format/blob/BlobFormatWriter.java       | 106 +++++++++++++++
 .../paimon/format/blob/BlobFileFormatTest.java     | 103 ++++++++++++++
 11 files changed, 844 insertions(+), 45 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/EmptyStatsExtractor.java 
b/paimon-common/src/main/java/org/apache/paimon/format/EmptyStatsExtractor.java
new file mode 100644
index 0000000000..5ee5a910df
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/EmptyStatsExtractor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.Pair;
+
+import java.io.IOException;
+
+/** An extractor that extracts no stats. */
+public class EmptyStatsExtractor implements SimpleStatsExtractor {
+
+    @Override
+    public SimpleColStats[] extract(FileIO fileIO, Path path, long length) 
throws IOException {
+        return new SimpleColStats[0];
+    }
+
+    @Override
+    public Pair<SimpleColStats[], FileInfo> extractWithFileInfo(
+            FileIO fileIO, Path path, long length) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
new file mode 100644
index 0000000000..c9fad5b457
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.io.IOException;
+
+/**
+ * A {@link SeekableInputStream} wrapping another {@link SeekableInputStream} 
with offset and
+ * length.
+ */
+public class OffsetSeekableInputStream extends SeekableInputStream {
+
+    private final SeekableInputStream wrapped;
+    private final long offset;
+    private final long length;
+
+    public OffsetSeekableInputStream(SeekableInputStream wrapped, long offset, 
long length)
+            throws IOException {
+        this.wrapped = wrapped;
+        this.offset = offset;
+        this.length = length;
+        wrapped.seek(offset);
+    }
+
+    @Override
+    public void seek(long desired) throws IOException {
+        wrapped.seek(offset + desired);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return wrapped.getPos() - offset;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (getPos() >= length) {
+            return -1;
+        }
+        return wrapped.read();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        long realLen = Math.min(len, length - getPos());
+        if (realLen == 0) {
+            return -1;
+        }
+        return wrapped.read(b, off, (int) realLen);
+    }
+
+    @Override
+    public void close() throws IOException {
+        wrapped.close();
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/StreamUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/StreamUtils.java
new file mode 100644
index 0000000000..6d0a8fb18d
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StreamUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/** An utility class for I/O related functionality. */
+public class StreamUtils {
+
+    public static byte[] intToLittleEndian(int value) {
+        ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+        buffer.order(ByteOrder.LITTLE_ENDIAN);
+        buffer.putInt(value);
+        return buffer.array();
+    }
+
+    public static byte[] longToLittleEndian(long value) {
+        ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.order(ByteOrder.LITTLE_ENDIAN);
+        buffer.putLong(value);
+        return buffer.array();
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/OffsetSeekableInputStreamTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fs/OffsetSeekableInputStreamTest.java
new file mode 100644
index 0000000000..fa8a14ced5
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/OffsetSeekableInputStreamTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link OffsetSeekableInputStream}. */
+public class OffsetSeekableInputStreamTest {
+
+    private byte[] testData;
+    private ByteArraySeekableStream wrapped;
+
+    @BeforeEach
+    public void setUp() {
+        testData = new byte[20];
+        for (int i = 0; i < testData.length; i++) {
+            testData[i] = (byte) i;
+        }
+        wrapped = new ByteArraySeekableStream(testData);
+    }
+
+    @Test
+    public void testConstructor() throws IOException {
+        long offset = 5;
+        long length = 10;
+        try (OffsetSeekableInputStream stream =
+                new OffsetSeekableInputStream(wrapped, offset, length)) {
+            assertThat(wrapped.getPos()).isEqualTo(offset);
+            assertThat(stream.getPos()).isZero();
+        }
+    }
+
+    @Test
+    public void testGetPosAndSeek() throws IOException {
+        long offset = 5;
+        long length = 10;
+        try (OffsetSeekableInputStream stream =
+                new OffsetSeekableInputStream(wrapped, offset, length)) {
+            stream.seek(3);
+            assertThat(stream.getPos()).isEqualTo(3);
+            assertThat(wrapped.getPos()).isEqualTo(offset + 3);
+
+            stream.seek(length);
+            assertThat(stream.getPos()).isEqualTo(length);
+            assertThat(wrapped.getPos()).isEqualTo(offset + length);
+
+            stream.seek(0);
+            assertThat(stream.getPos()).isZero();
+            assertThat(wrapped.getPos()).isEqualTo(offset);
+        }
+    }
+
+    @Test
+    public void testReadSingleByte() throws IOException {
+        long offset = 5;
+        long length = 10;
+        try (OffsetSeekableInputStream stream =
+                new OffsetSeekableInputStream(wrapped, offset, length)) {
+            assertThat(stream.read()).isEqualTo(testData[5]);
+            assertThat(stream.getPos()).isEqualTo(1);
+            assertThat(wrapped.getPos()).isEqualTo(offset + 1);
+
+            stream.seek(length - 1);
+            assertThat(stream.read()).isEqualTo(testData[(int) (offset + 
length - 1)]);
+            assertThat(stream.getPos()).isEqualTo(length);
+        }
+    }
+
+    @Test
+    public void testReadSingleByteAtEnd() throws IOException {
+        long offset = 5;
+        long length = 10;
+        try (OffsetSeekableInputStream stream =
+                new OffsetSeekableInputStream(wrapped, offset, length)) {
+            stream.seek(length);
+            assertThat(stream.read()).isEqualTo(-1);
+        }
+    }
+
+    @Test
+    public void testReadByteArray() throws IOException {
+        long offset = 5;
+        long length = 10;
+        try (OffsetSeekableInputStream stream =
+                new OffsetSeekableInputStream(wrapped, offset, length)) {
+            byte[] buffer = new byte[5];
+            int bytesRead = stream.read(buffer, 0, 5);
+
+            assertThat(bytesRead).isEqualTo(5);
+            assertThat(buffer).containsExactly(Arrays.copyOfRange(testData, 5, 
10));
+            assertThat(stream.getPos()).isEqualTo(5);
+            assertThat(wrapped.getPos()).isEqualTo(offset + 5);
+        }
+    }
+
+    @Test
+    public void testReadByteArrayHittingEnd() throws IOException {
+        long offset = 5;
+        long length = 10;
+        try (OffsetSeekableInputStream stream =
+                new OffsetSeekableInputStream(wrapped, offset, length)) {
+            stream.seek(7);
+            byte[] buffer = new byte[5]; // Request more than available
+            int bytesRead = stream.read(buffer, 0, 5);
+
+            assertThat(bytesRead).isEqualTo(3); // Only 3 bytes should be read 
(10 - 7)
+            byte[] expected = new byte[5];
+            System.arraycopy(testData, 12, expected, 0, 3);
+            assertThat(buffer).isEqualTo(expected);
+            assertThat(stream.getPos()).isEqualTo(length);
+            assertThat(wrapped.getPos()).isEqualTo(offset + length);
+        }
+    }
+
+    @Test
+    public void testReadByteArrayAtEnd() throws IOException {
+        long offset = 5;
+        long length = 10;
+        try (OffsetSeekableInputStream stream =
+                new OffsetSeekableInputStream(wrapped, offset, length)) {
+            stream.seek(length);
+            byte[] buffer = new byte[5];
+            int bytesRead = stream.read(buffer, 0, 5);
+            assertThat(bytesRead).isEqualTo(-1);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
index 70d2555cf3..44afb082ef 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.OffsetSeekableInputStream;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.SeekableInputStream;
@@ -30,7 +31,6 @@ import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.utils.CompactedChangelogPathResolver;
 import org.apache.paimon.utils.RoaringBitmap32;
 
-import java.io.EOFException;
 import java.io.IOException;
 
 /**
@@ -166,48 +166,4 @@ public class CompactedChangelogFormatReaderFactory 
implements FormatReaderFactor
             throw new UnsupportedOperationException();
         }
     }
-
-    private static class OffsetSeekableInputStream extends SeekableInputStream 
{
-
-        private final SeekableInputStream wrapped;
-        private final long offset;
-        private final long length;
-
-        private OffsetSeekableInputStream(SeekableInputStream wrapped, long 
offset, long length)
-                throws IOException {
-            this.wrapped = wrapped;
-            this.offset = offset;
-            this.length = length;
-            wrapped.seek(offset);
-        }
-
-        @Override
-        public void seek(long desired) throws IOException {
-            wrapped.seek(offset + desired);
-        }
-
-        @Override
-        public long getPos() throws IOException {
-            return wrapped.getPos() - offset;
-        }
-
-        @Override
-        public int read() throws IOException {
-            if (getPos() >= length) {
-                throw new EOFException();
-            }
-            return wrapped.read();
-        }
-
-        @Override
-        public int read(byte[] b, int off, int len) throws IOException {
-            long realLen = Math.min(len, length - getPos());
-            return wrapped.read(b, off, (int) realLen);
-        }
-
-        @Override
-        public void close() throws IOException {
-            wrapped.close();
-        }
-    }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
new file mode 100644
index 0000000000..f6b5345ebf
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.blob;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.EmptyStatsExtractor;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SimpleStatsExtractor;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** {@link FileFormat} for blob file. */
+public class BlobFileFormat extends FileFormat {
+
+    public BlobFileFormat() {
+        super(BlobFileFormatFactory.IDENTIFIER);
+    }
+
+    @Override
+    public FormatReaderFactory createReaderFactory(
+            RowType dataSchemaRowType,
+            RowType projectedRowType,
+            @Nullable List<Predicate> filters) {
+        return new BlobFormatReaderFactory();
+    }
+
+    @Override
+    public FormatWriterFactory createWriterFactory(RowType type) {
+        return new BlobFormatWriterFactory();
+    }
+
+    @Override
+    public void validateDataFields(RowType rowType) {
+        checkArgument(rowType.getFieldCount() == 1, "BlobFileFormat only 
support one field.");
+        checkArgument(
+                rowType.getField(0).type().getTypeRoot() == DataTypeRoot.BLOB,
+                "BlobFileFormat only support blob type.");
+    }
+
+    @Override
+    public Optional<SimpleStatsExtractor> createStatsExtractor(
+            RowType type, SimpleColStatsCollector.Factory[] statsCollectors) {
+        // return a empty stats extractor to avoid stats calculation
+        return Optional.of(new EmptyStatsExtractor());
+    }
+
+    private static class BlobFormatWriterFactory implements 
FormatWriterFactory {
+
+        @Override
+        public FormatWriter create(PositionOutputStream out, String 
compression) {
+            return new BlobFormatWriter(out);
+        }
+    }
+
+    private static class BlobFormatReaderFactory implements 
FormatReaderFactory {
+
+        @Override
+        public FileRecordReader<InternalRow> createReader(Context context) 
throws IOException {
+            return new BlobFormatReader(
+                    context.fileIO(), context.filePath(), context.fileSize(), 
context.selection());
+        }
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
new file mode 100644
index 0000000000..84b0d9431e
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.blob;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+
+/** Factory to create {@link BlobFileFormat}. */
+public class BlobFileFormatFactory implements FileFormatFactory {
+
+    public static final String IDENTIFIER = "blob";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public FileFormat create(FormatContext formatContext) {
+        return new BlobFileFormat();
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileRef.java 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileRef.java
new file mode 100644
index 0000000000..85e98e3ab8
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileRef.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.blob;
+
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.OffsetSeekableInputStream;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.IOUtils;
+
+import java.io.IOException;
+
+/** BlobRef is a reference to a blob file. */
+public class BlobFileRef implements Blob {
+
+    private final FileIO fileIO;
+    private final Path filePath;
+    private final long blobLength;
+    private final long blobOffset;
+
+    public BlobFileRef(FileIO fileIO, Path filePath, long blobLength, long 
blobOffset) {
+        this.fileIO = fileIO;
+        this.filePath = filePath;
+        this.blobLength = blobLength;
+        this.blobOffset = blobOffset;
+    }
+
+    @Override
+    public byte[] toBytes() {
+        try {
+            return IOUtils.readFully(newInputStream(), true);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public SeekableInputStream newInputStream() throws IOException {
+        SeekableInputStream in = fileIO.newInputStream(filePath);
+        // TODO validate Magic number?
+        return new OffsetSeekableInputStream(in, blobOffset + 4, blobLength - 
16);
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
new file mode 100644
index 0000000000..4a1a1d6444
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.blob;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.memory.BytesUtils;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.utils.DeltaVarintCompressor;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/** {@link FileRecordReader} for blob file. */
+public class BlobFormatReader implements FileRecordReader<InternalRow> {
+
+    private final FileIO fileIO;
+    private final Path filePath;
+    private final long[] blobLengths;
+    private final long[] blobOffsets;
+
+    private boolean returned;
+
+    public BlobFormatReader(
+            FileIO fileIO, Path filePath, long fileSize, @Nullable 
RoaringBitmap32 selection)
+            throws IOException {
+        this.fileIO = fileIO;
+        this.filePath = filePath;
+        this.returned = false;
+        try (SeekableInputStream in = fileIO.newInputStream(filePath)) {
+            in.seek(fileSize - 5);
+            byte[] header = new byte[5];
+            IOUtils.readFully(in, header);
+            byte version = header[4];
+            if (version != 1) {
+                throw new IOException("Unsupported version: " + version);
+            }
+            int indexLength = BytesUtils.getInt(header, 0);
+
+            in.seek(fileSize - 5 - indexLength);
+            byte[] indexBytes = new byte[indexLength];
+            IOUtils.readFully(in, indexBytes);
+
+            long[] blobLengths = DeltaVarintCompressor.decompress(indexBytes);
+            long[] blobOffsets = new long[blobLengths.length];
+            long offset = 0;
+            for (int i = 0; i < blobLengths.length; i++) {
+                blobOffsets[i] = offset;
+                offset += blobLengths[i];
+            }
+
+            if (selection != null) {
+                int cardinality = (int) selection.getCardinality();
+                long[] newLengths = new long[cardinality];
+                long[] newOffsets = new long[cardinality];
+                Iterator<Integer> iterator = selection.iterator();
+                for (int i = 0; i < cardinality; i++) {
+                    Integer next = iterator.next();
+                    newLengths[i] = blobLengths[next];
+                    newOffsets[i] = blobOffsets[next];
+                }
+                blobLengths = newLengths;
+                blobOffsets = newOffsets;
+            }
+
+            this.blobLengths = blobLengths;
+            this.blobOffsets = blobOffsets;
+        }
+    }
+
+    @Nullable
+    @Override
+    public FileRecordIterator<InternalRow> readBatch() throws IOException {
+        if (returned) {
+            return null;
+        }
+
+        returned = true;
+        return new FileRecordIterator<InternalRow>() {
+
+            int currentPosition = 0;
+
+            @Override
+            public long returnedPosition() {
+                return currentPosition;
+            }
+
+            @Override
+            public Path filePath() {
+                return filePath;
+            }
+
+            @Nullable
+            @Override
+            public InternalRow next() {
+                if (currentPosition >= blobLengths.length) {
+                    return null;
+                }
+
+                BlobFileRef blobRef =
+                        new BlobFileRef(
+                                fileIO,
+                                filePath,
+                                blobLengths[currentPosition],
+                                blobOffsets[currentPosition]);
+                currentPosition++;
+                return GenericRow.of(blobRef);
+            }
+
+            @Override
+            public void releaseBatch() {}
+        };
+    }
+
+    @Override
+    public void close() throws IOException {}
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
new file mode 100644
index 0000000000..a1b1c113d5
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.blob;
+
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.DeltaVarintCompressor;
+import org.apache.paimon.utils.LongArrayList;
+
+import java.io.IOException;
+import java.util.zip.CRC32;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.StreamUtils.intToLittleEndian;
+import static org.apache.paimon.utils.StreamUtils.longToLittleEndian;
+
+/** {@link FormatWriter} for blob file. */
+public class BlobFormatWriter implements FormatWriter {
+
+    public static final byte VERSION = 1;
+    public static final int MAGIC_NUMBER = 1481511375;
+    public static final byte[] MAGIC_NUMBER_BYTES = 
intToLittleEndian(MAGIC_NUMBER);
+
+    private final PositionOutputStream out;
+    private final CRC32 crc32;
+    private final byte[] tmpBuffer;
+    private final LongArrayList lengths;
+
+    public BlobFormatWriter(PositionOutputStream out) {
+        this.out = out;
+        this.crc32 = new CRC32();
+        this.tmpBuffer = new byte[4096];
+        this.lengths = new LongArrayList(16);
+    }
+
+    @Override
+    public void addElement(InternalRow element) throws IOException {
+        checkArgument(element.getFieldCount() == 1, "BlobFormatWriter only 
support one field.");
+        checkArgument(!element.isNullAt(0), "BlobFormatWriter only support 
non-null blob.");
+        Blob blob = element.getBlob(0);
+
+        long previousPos = out.getPos();
+        crc32.reset();
+
+        write(MAGIC_NUMBER_BYTES);
+        try (SeekableInputStream in = blob.newInputStream()) {
+            int bytesRead = in.read(tmpBuffer);
+            while (bytesRead >= 0) {
+                write(tmpBuffer, bytesRead);
+                bytesRead = in.read(tmpBuffer);
+            }
+        }
+
+        long binLength = out.getPos() - previousPos + 12;
+        lengths.add(binLength);
+        byte[] lenBytes = longToLittleEndian(binLength);
+        write(lenBytes);
+        int crcValue = (int) crc32.getValue();
+        out.write(intToLittleEndian(crcValue));
+    }
+
+    private void write(byte[] bytes) throws IOException {
+        write(bytes, bytes.length);
+    }
+
+    private void write(byte[] bytes, int length) throws IOException {
+        crc32.update(bytes, 0, length);
+        out.write(bytes, 0, length);
+    }
+
+    @Override
+    public boolean reachTargetSize(boolean suggestedCheck, long targetSize) 
throws IOException {
+        // check target size every record
+        // Each blob is very large, so the cost of check is not high
+        return out.getPos() >= targetSize;
+    }
+
+    @Override
+    public void close() throws IOException {
+        // index
+        byte[] indexBytes = DeltaVarintCompressor.compress(lengths.toArray());
+        out.write(indexBytes);
+        // header
+        out.write(intToLittleEndian(indexBytes.length));
+        out.write(VERSION);
+    }
+}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
new file mode 100644
index 0000000000..d30a669a29
--- /dev/null
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.blob;
+
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BlobFileFormat}. */
+public class BlobFileFormatTest {
+
+    @TempDir java.nio.file.Path tempPath;
+
+    protected FileIO fileIO;
+    protected Path file;
+    protected Path parent;
+
+    @BeforeEach
+    public void beforeEach() {
+        this.fileIO = LocalFileIO.create();
+        this.parent = new Path(tempPath.toUri());
+        this.file = new Path(new Path(tempPath.toUri()), 
UUID.randomUUID().toString());
+    }
+
+    @Test
+    public void test() throws IOException {
+        BlobFileFormat format = new BlobFileFormat();
+        RowType rowType = RowType.of(DataTypes.BLOB());
+
+        // write
+        FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
+        List<byte[]> blobs = Arrays.asList("hello".getBytes(), 
"world".getBytes());
+        try (PositionOutputStream out = fileIO.newOutputStream(file, false)) {
+            FormatWriter formatWriter = writerFactory.create(out, null);
+            for (byte[] bytes : blobs) {
+                formatWriter.addElement(GenericRow.of(new BlobData(bytes)));
+            }
+            formatWriter.close();
+        }
+
+        // read
+        FormatReaderFactory readerFactory = format.createReaderFactory(null, 
null, null);
+        FormatReaderContext context =
+                new FormatReaderContext(fileIO, file, 
fileIO.getFileSize(file));
+        List<byte[]> result = new ArrayList<>();
+        readerFactory
+                .createReader(context)
+                .forEachRemaining(row -> result.add(row.getBlob(0).toBytes()));
+
+        // assert
+        assertThat(result).containsExactlyElementsOf(blobs);
+
+        // read with selection
+        RoaringBitmap32 selection = new RoaringBitmap32();
+        selection.add(1);
+        context = new FormatReaderContext(fileIO, file, 
fileIO.getFileSize(file), selection);
+        result.clear();
+        readerFactory
+                .createReader(context)
+                .forEachRemaining(row -> result.add(row.getBlob(0).toBytes()));
+
+        // assert
+        assertThat(result).containsOnly(blobs.get(1));
+    }
+}


Reply via email to