This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 523244cb1f [format] Introduce BlobFileFormat to blob storage (#6283)
523244cb1f is described below
commit 523244cb1fe94a8f5b4593b7a262574e587702ad
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Sep 19 14:42:15 2025 +0800
[format] Introduce BlobFileFormat to blob storage (#6283)
---
.../apache/paimon/format/EmptyStatsExtractor.java | 40 ++++++
.../paimon/fs/OffsetSeekableInputStream.java | 72 ++++++++++
.../java/org/apache/paimon/utils/StreamUtils.java | 40 ++++++
.../paimon/fs/OffsetSeekableInputStreamTest.java | 149 +++++++++++++++++++++
.../CompactedChangelogFormatReaderFactory.java | 46 +------
.../apache/paimon/format/blob/BlobFileFormat.java | 94 +++++++++++++
.../paimon/format/blob/BlobFileFormatFactory.java | 38 ++++++
.../org/apache/paimon/format/blob/BlobFileRef.java | 60 +++++++++
.../paimon/format/blob/BlobFormatReader.java | 141 +++++++++++++++++++
.../paimon/format/blob/BlobFormatWriter.java | 106 +++++++++++++++
.../paimon/format/blob/BlobFileFormatTest.java | 103 ++++++++++++++
11 files changed, 844 insertions(+), 45 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/EmptyStatsExtractor.java
b/paimon-common/src/main/java/org/apache/paimon/format/EmptyStatsExtractor.java
new file mode 100644
index 0000000000..5ee5a910df
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/format/EmptyStatsExtractor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.Pair;
+
+import java.io.IOException;
+
+/** An extractor that extracts no stats. */
+public class EmptyStatsExtractor implements SimpleStatsExtractor {
+
+ @Override
+ public SimpleColStats[] extract(FileIO fileIO, Path path, long length)
throws IOException {
+ return new SimpleColStats[0];
+ }
+
+ @Override
+ public Pair<SimpleColStats[], FileInfo> extractWithFileInfo(
+ FileIO fileIO, Path path, long length) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
new file mode 100644
index 0000000000..c9fad5b457
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.io.IOException;
+
+/**
+ * A {@link SeekableInputStream} wrapping another {@link SeekableInputStream}
with offset and
+ * length.
+ */
+public class OffsetSeekableInputStream extends SeekableInputStream {
+
+ private final SeekableInputStream wrapped;
+ private final long offset;
+ private final long length;
+
+ public OffsetSeekableInputStream(SeekableInputStream wrapped, long offset,
long length)
+ throws IOException {
+ this.wrapped = wrapped;
+ this.offset = offset;
+ this.length = length;
+ wrapped.seek(offset);
+ }
+
+ @Override
+ public void seek(long desired) throws IOException {
+ wrapped.seek(offset + desired);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return wrapped.getPos() - offset;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (getPos() >= length) {
+ return -1;
+ }
+ return wrapped.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ long realLen = Math.min(len, length - getPos());
+ if (realLen == 0) {
+ return -1;
+ }
+ return wrapped.read(b, off, (int) realLen);
+ }
+
+ @Override
+ public void close() throws IOException {
+ wrapped.close();
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/StreamUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/StreamUtils.java
new file mode 100644
index 0000000000..6d0a8fb18d
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StreamUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/** An utility class for I/O related functionality. */
+public class StreamUtils {
+
+ public static byte[] intToLittleEndian(int value) {
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+ buffer.putInt(value);
+ return buffer.array();
+ }
+
+ public static byte[] longToLittleEndian(long value) {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+ buffer.putLong(value);
+ return buffer.array();
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/OffsetSeekableInputStreamTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/OffsetSeekableInputStreamTest.java
new file mode 100644
index 0000000000..fa8a14ced5
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/OffsetSeekableInputStreamTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link OffsetSeekableInputStream}. */
+public class OffsetSeekableInputStreamTest {
+
+ private byte[] testData;
+ private ByteArraySeekableStream wrapped;
+
+ @BeforeEach
+ public void setUp() {
+ testData = new byte[20];
+ for (int i = 0; i < testData.length; i++) {
+ testData[i] = (byte) i;
+ }
+ wrapped = new ByteArraySeekableStream(testData);
+ }
+
+ @Test
+ public void testConstructor() throws IOException {
+ long offset = 5;
+ long length = 10;
+ try (OffsetSeekableInputStream stream =
+ new OffsetSeekableInputStream(wrapped, offset, length)) {
+ assertThat(wrapped.getPos()).isEqualTo(offset);
+ assertThat(stream.getPos()).isZero();
+ }
+ }
+
+ @Test
+ public void testGetPosAndSeek() throws IOException {
+ long offset = 5;
+ long length = 10;
+ try (OffsetSeekableInputStream stream =
+ new OffsetSeekableInputStream(wrapped, offset, length)) {
+ stream.seek(3);
+ assertThat(stream.getPos()).isEqualTo(3);
+ assertThat(wrapped.getPos()).isEqualTo(offset + 3);
+
+ stream.seek(length);
+ assertThat(stream.getPos()).isEqualTo(length);
+ assertThat(wrapped.getPos()).isEqualTo(offset + length);
+
+ stream.seek(0);
+ assertThat(stream.getPos()).isZero();
+ assertThat(wrapped.getPos()).isEqualTo(offset);
+ }
+ }
+
+ @Test
+ public void testReadSingleByte() throws IOException {
+ long offset = 5;
+ long length = 10;
+ try (OffsetSeekableInputStream stream =
+ new OffsetSeekableInputStream(wrapped, offset, length)) {
+ assertThat(stream.read()).isEqualTo(testData[5]);
+ assertThat(stream.getPos()).isEqualTo(1);
+ assertThat(wrapped.getPos()).isEqualTo(offset + 1);
+
+ stream.seek(length - 1);
+ assertThat(stream.read()).isEqualTo(testData[(int) (offset +
length - 1)]);
+ assertThat(stream.getPos()).isEqualTo(length);
+ }
+ }
+
+ @Test
+ public void testReadSingleByteAtEnd() throws IOException {
+ long offset = 5;
+ long length = 10;
+ try (OffsetSeekableInputStream stream =
+ new OffsetSeekableInputStream(wrapped, offset, length)) {
+ stream.seek(length);
+ assertThat(stream.read()).isEqualTo(-1);
+ }
+ }
+
+ @Test
+ public void testReadByteArray() throws IOException {
+ long offset = 5;
+ long length = 10;
+ try (OffsetSeekableInputStream stream =
+ new OffsetSeekableInputStream(wrapped, offset, length)) {
+ byte[] buffer = new byte[5];
+ int bytesRead = stream.read(buffer, 0, 5);
+
+ assertThat(bytesRead).isEqualTo(5);
+ assertThat(buffer).containsExactly(Arrays.copyOfRange(testData, 5,
10));
+ assertThat(stream.getPos()).isEqualTo(5);
+ assertThat(wrapped.getPos()).isEqualTo(offset + 5);
+ }
+ }
+
+ @Test
+ public void testReadByteArrayHittingEnd() throws IOException {
+ long offset = 5;
+ long length = 10;
+ try (OffsetSeekableInputStream stream =
+ new OffsetSeekableInputStream(wrapped, offset, length)) {
+ stream.seek(7);
+ byte[] buffer = new byte[5]; // Request more than available
+ int bytesRead = stream.read(buffer, 0, 5);
+
+ assertThat(bytesRead).isEqualTo(3); // Only 3 bytes should be read
(10 - 7)
+ byte[] expected = new byte[5];
+ System.arraycopy(testData, 12, expected, 0, 3);
+ assertThat(buffer).isEqualTo(expected);
+ assertThat(stream.getPos()).isEqualTo(length);
+ assertThat(wrapped.getPos()).isEqualTo(offset + length);
+ }
+ }
+
+ @Test
+ public void testReadByteArrayAtEnd() throws IOException {
+ long offset = 5;
+ long length = 10;
+ try (OffsetSeekableInputStream stream =
+ new OffsetSeekableInputStream(wrapped, offset, length)) {
+ stream.seek(length);
+ byte[] buffer = new byte[5];
+ int bytesRead = stream.read(buffer, 0, 5);
+ assertThat(bytesRead).isEqualTo(-1);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
index 70d2555cf3..44afb082ef 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.OffsetSeekableInputStream;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
@@ -30,7 +31,6 @@ import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.utils.CompactedChangelogPathResolver;
import org.apache.paimon.utils.RoaringBitmap32;
-import java.io.EOFException;
import java.io.IOException;
/**
@@ -166,48 +166,4 @@ public class CompactedChangelogFormatReaderFactory
implements FormatReaderFactor
throw new UnsupportedOperationException();
}
}
-
- private static class OffsetSeekableInputStream extends SeekableInputStream
{
-
- private final SeekableInputStream wrapped;
- private final long offset;
- private final long length;
-
- private OffsetSeekableInputStream(SeekableInputStream wrapped, long
offset, long length)
- throws IOException {
- this.wrapped = wrapped;
- this.offset = offset;
- this.length = length;
- wrapped.seek(offset);
- }
-
- @Override
- public void seek(long desired) throws IOException {
- wrapped.seek(offset + desired);
- }
-
- @Override
- public long getPos() throws IOException {
- return wrapped.getPos() - offset;
- }
-
- @Override
- public int read() throws IOException {
- if (getPos() >= length) {
- throw new EOFException();
- }
- return wrapped.read();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- long realLen = Math.min(len, length - getPos());
- return wrapped.read(b, off, (int) realLen);
- }
-
- @Override
- public void close() throws IOException {
- wrapped.close();
- }
- }
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
new file mode 100644
index 0000000000..f6b5345ebf
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.blob;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.EmptyStatsExtractor;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SimpleStatsExtractor;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** {@link FileFormat} for blob file. */
+public class BlobFileFormat extends FileFormat {
+
+ public BlobFileFormat() {
+ super(BlobFileFormatFactory.IDENTIFIER);
+ }
+
+ @Override
+ public FormatReaderFactory createReaderFactory(
+ RowType dataSchemaRowType,
+ RowType projectedRowType,
+ @Nullable List<Predicate> filters) {
+ return new BlobFormatReaderFactory();
+ }
+
+ @Override
+ public FormatWriterFactory createWriterFactory(RowType type) {
+ return new BlobFormatWriterFactory();
+ }
+
+ @Override
+ public void validateDataFields(RowType rowType) {
+ checkArgument(rowType.getFieldCount() == 1, "BlobFileFormat only
support one field.");
+ checkArgument(
+ rowType.getField(0).type().getTypeRoot() == DataTypeRoot.BLOB,
+ "BlobFileFormat only support blob type.");
+ }
+
+ @Override
+ public Optional<SimpleStatsExtractor> createStatsExtractor(
+ RowType type, SimpleColStatsCollector.Factory[] statsCollectors) {
+ // return a empty stats extractor to avoid stats calculation
+ return Optional.of(new EmptyStatsExtractor());
+ }
+
+ private static class BlobFormatWriterFactory implements
FormatWriterFactory {
+
+ @Override
+ public FormatWriter create(PositionOutputStream out, String
compression) {
+ return new BlobFormatWriter(out);
+ }
+ }
+
+ private static class BlobFormatReaderFactory implements
FormatReaderFactory {
+
+ @Override
+ public FileRecordReader<InternalRow> createReader(Context context)
throws IOException {
+ return new BlobFormatReader(
+ context.fileIO(), context.filePath(), context.fileSize(),
context.selection());
+ }
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
new file mode 100644
index 0000000000..84b0d9431e
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.blob;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+
+/** Factory to create {@link BlobFileFormat}. */
+public class BlobFileFormatFactory implements FileFormatFactory {
+
+ public static final String IDENTIFIER = "blob";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public FileFormat create(FormatContext formatContext) {
+ return new BlobFileFormat();
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileRef.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileRef.java
new file mode 100644
index 0000000000..85e98e3ab8
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileRef.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.blob;
+
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.OffsetSeekableInputStream;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.IOUtils;
+
+import java.io.IOException;
+
+/** BlobRef is a reference to a blob file. */
+public class BlobFileRef implements Blob {
+
+ private final FileIO fileIO;
+ private final Path filePath;
+ private final long blobLength;
+ private final long blobOffset;
+
+ public BlobFileRef(FileIO fileIO, Path filePath, long blobLength, long
blobOffset) {
+ this.fileIO = fileIO;
+ this.filePath = filePath;
+ this.blobLength = blobLength;
+ this.blobOffset = blobOffset;
+ }
+
+ @Override
+ public byte[] toBytes() {
+ try {
+ return IOUtils.readFully(newInputStream(), true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public SeekableInputStream newInputStream() throws IOException {
+ SeekableInputStream in = fileIO.newInputStream(filePath);
+ // TODO validate Magic number?
+ return new OffsetSeekableInputStream(in, blobOffset + 4, blobLength -
16);
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
new file mode 100644
index 0000000000..4a1a1d6444
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.blob;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.memory.BytesUtils;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.utils.DeltaVarintCompressor;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/** {@link FileRecordReader} for blob file. */
+public class BlobFormatReader implements FileRecordReader<InternalRow> {
+
+ private final FileIO fileIO;
+ private final Path filePath;
+ private final long[] blobLengths;
+ private final long[] blobOffsets;
+
+ private boolean returned;
+
+ public BlobFormatReader(
+ FileIO fileIO, Path filePath, long fileSize, @Nullable
RoaringBitmap32 selection)
+ throws IOException {
+ this.fileIO = fileIO;
+ this.filePath = filePath;
+ this.returned = false;
+ try (SeekableInputStream in = fileIO.newInputStream(filePath)) {
+ in.seek(fileSize - 5);
+ byte[] header = new byte[5];
+ IOUtils.readFully(in, header);
+ byte version = header[4];
+ if (version != 1) {
+ throw new IOException("Unsupported version: " + version);
+ }
+ int indexLength = BytesUtils.getInt(header, 0);
+
+ in.seek(fileSize - 5 - indexLength);
+ byte[] indexBytes = new byte[indexLength];
+ IOUtils.readFully(in, indexBytes);
+
+ long[] blobLengths = DeltaVarintCompressor.decompress(indexBytes);
+ long[] blobOffsets = new long[blobLengths.length];
+ long offset = 0;
+ for (int i = 0; i < blobLengths.length; i++) {
+ blobOffsets[i] = offset;
+ offset += blobLengths[i];
+ }
+
+ if (selection != null) {
+ int cardinality = (int) selection.getCardinality();
+ long[] newLengths = new long[cardinality];
+ long[] newOffsets = new long[cardinality];
+ Iterator<Integer> iterator = selection.iterator();
+ for (int i = 0; i < cardinality; i++) {
+ Integer next = iterator.next();
+ newLengths[i] = blobLengths[next];
+ newOffsets[i] = blobOffsets[next];
+ }
+ blobLengths = newLengths;
+ blobOffsets = newOffsets;
+ }
+
+ this.blobLengths = blobLengths;
+ this.blobOffsets = blobOffsets;
+ }
+ }
+
+ @Nullable
+ @Override
+ public FileRecordIterator<InternalRow> readBatch() throws IOException {
+ if (returned) {
+ return null;
+ }
+
+ returned = true;
+ return new FileRecordIterator<InternalRow>() {
+
+ int currentPosition = 0;
+
+ @Override
+ public long returnedPosition() {
+ return currentPosition;
+ }
+
+ @Override
+ public Path filePath() {
+ return filePath;
+ }
+
+ @Nullable
+ @Override
+ public InternalRow next() {
+ if (currentPosition >= blobLengths.length) {
+ return null;
+ }
+
+ BlobFileRef blobRef =
+ new BlobFileRef(
+ fileIO,
+ filePath,
+ blobLengths[currentPosition],
+ blobOffsets[currentPosition]);
+ currentPosition++;
+ return GenericRow.of(blobRef);
+ }
+
+ @Override
+ public void releaseBatch() {}
+ };
+ }
+
+ @Override
+ public void close() throws IOException {}
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
new file mode 100644
index 0000000000..a1b1c113d5
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.blob;
+
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.DeltaVarintCompressor;
+import org.apache.paimon.utils.LongArrayList;
+
+import java.io.IOException;
+import java.util.zip.CRC32;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.StreamUtils.intToLittleEndian;
+import static org.apache.paimon.utils.StreamUtils.longToLittleEndian;
+
+/** {@link FormatWriter} for blob file. */
+public class BlobFormatWriter implements FormatWriter {
+
+ public static final byte VERSION = 1;
+ public static final int MAGIC_NUMBER = 1481511375;
+ public static final byte[] MAGIC_NUMBER_BYTES =
intToLittleEndian(MAGIC_NUMBER);
+
+ private final PositionOutputStream out;
+ private final CRC32 crc32;
+ private final byte[] tmpBuffer;
+ private final LongArrayList lengths;
+
+ public BlobFormatWriter(PositionOutputStream out) {
+ this.out = out;
+ this.crc32 = new CRC32();
+ this.tmpBuffer = new byte[4096];
+ this.lengths = new LongArrayList(16);
+ }
+
+ @Override
+ public void addElement(InternalRow element) throws IOException {
+ checkArgument(element.getFieldCount() == 1, "BlobFormatWriter only
support one field.");
+ checkArgument(!element.isNullAt(0), "BlobFormatWriter only support
non-null blob.");
+ Blob blob = element.getBlob(0);
+
+ long previousPos = out.getPos();
+ crc32.reset();
+
+ write(MAGIC_NUMBER_BYTES);
+ try (SeekableInputStream in = blob.newInputStream()) {
+ int bytesRead = in.read(tmpBuffer);
+ while (bytesRead >= 0) {
+ write(tmpBuffer, bytesRead);
+ bytesRead = in.read(tmpBuffer);
+ }
+ }
+
+ long binLength = out.getPos() - previousPos + 12;
+ lengths.add(binLength);
+ byte[] lenBytes = longToLittleEndian(binLength);
+ write(lenBytes);
+ int crcValue = (int) crc32.getValue();
+ out.write(intToLittleEndian(crcValue));
+ }
+
+ private void write(byte[] bytes) throws IOException {
+ write(bytes, bytes.length);
+ }
+
+ private void write(byte[] bytes, int length) throws IOException {
+ crc32.update(bytes, 0, length);
+ out.write(bytes, 0, length);
+ }
+
+ @Override
+ public boolean reachTargetSize(boolean suggestedCheck, long targetSize)
throws IOException {
+ // check target size every record
+ // Each blob is very large, so the cost of check is not high
+ return out.getPos() >= targetSize;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // index
+ byte[] indexBytes = DeltaVarintCompressor.compress(lengths.toArray());
+ out.write(indexBytes);
+ // header
+ out.write(intToLittleEndian(indexBytes.length));
+ out.write(VERSION);
+ }
+}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
new file mode 100644
index 0000000000..d30a669a29
--- /dev/null
+++
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.blob;
+
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BlobFileFormat}. */
+public class BlobFileFormatTest {
+
+ @TempDir java.nio.file.Path tempPath;
+
+ protected FileIO fileIO;
+ protected Path file;
+ protected Path parent;
+
+ @BeforeEach
+ public void beforeEach() {
+ this.fileIO = LocalFileIO.create();
+ this.parent = new Path(tempPath.toUri());
+ this.file = new Path(new Path(tempPath.toUri()),
UUID.randomUUID().toString());
+ }
+
+ @Test
+ public void test() throws IOException {
+ BlobFileFormat format = new BlobFileFormat();
+ RowType rowType = RowType.of(DataTypes.BLOB());
+
+ // write
+ FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
+ List<byte[]> blobs = Arrays.asList("hello".getBytes(),
"world".getBytes());
+ try (PositionOutputStream out = fileIO.newOutputStream(file, false)) {
+ FormatWriter formatWriter = writerFactory.create(out, null);
+ for (byte[] bytes : blobs) {
+ formatWriter.addElement(GenericRow.of(new BlobData(bytes)));
+ }
+ formatWriter.close();
+ }
+
+ // read
+ FormatReaderFactory readerFactory = format.createReaderFactory(null,
null, null);
+ FormatReaderContext context =
+ new FormatReaderContext(fileIO, file,
fileIO.getFileSize(file));
+ List<byte[]> result = new ArrayList<>();
+ readerFactory
+ .createReader(context)
+ .forEachRemaining(row -> result.add(row.getBlob(0).toBytes()));
+
+ // assert
+ assertThat(result).containsExactlyElementsOf(blobs);
+
+ // read with selection
+ RoaringBitmap32 selection = new RoaringBitmap32();
+ selection.add(1);
+ context = new FormatReaderContext(fileIO, file,
fileIO.getFileSize(file), selection);
+ result.clear();
+ readerFactory
+ .createReader(context)
+ .forEachRemaining(row -> result.add(row.getBlob(0).toBytes()));
+
+ // assert
+ assertThat(result).containsOnly(blobs.get(1));
+ }
+}