This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d54c762118 [core] Introduce withBlobConsumer in TableWrite (#7074)
d54c762118 is described below

commit d54c762118394412dadcb047502c7dea1a65b11b
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jan 19 21:48:37 2026 +0800

    [core] Introduce withBlobConsumer in TableWrite (#7074)
---
 .../java/org/apache/paimon/data/BlobConsumer.java  |  30 ++++++
 .../paimon/format/FileAwareFormatWriter.java       |  31 ++++++
 .../org/apache/paimon/append/AppendOnlyWriter.java |   9 +-
 .../paimon/append/RollingBlobFileWriter.java       |  19 ++--
 .../apache/paimon/io/RollingFileWriterImpl.java    |   2 +-
 .../org/apache/paimon/io/SingleFileWriter.java     |  29 +++--
 .../paimon/operation/AbstractFileStoreWrite.java   |   6 ++
 .../paimon/operation/BaseAppendFileStoreWrite.java |  11 +-
 .../apache/paimon/operation/FileStoreWrite.java    |   3 +
 .../paimon/table/format/FormatTableWrite.java      |  11 +-
 .../org/apache/paimon/table/sink/TableWrite.java   |  15 +--
 .../apache/paimon/table/sink/TableWriteImpl.java   |   7 ++
 .../apache/paimon/append/AppendOnlyWriterTest.java |   3 +-
 .../org/apache/paimon/append/BlobTableTest.java    |  28 +++++
 .../paimon/append/RollingBlobFileWriterTest.java   |  20 ++--
 .../apache/paimon/format/FileFormatSuffixTest.java | 117 ---------------------
 .../paimon/io/KeyValueFileReadWriteTest.java       |  91 +++++++++++++---
 .../apache/paimon/format/blob/BlobFileFormat.java  |  11 +-
 .../paimon/format/blob/BlobFormatReader.java       |   4 +-
 .../paimon/format/blob/BlobFormatWriter.java       |  35 +++++-
 20 files changed, 302 insertions(+), 180 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/BlobConsumer.java 
b/paimon-common/src/main/java/org/apache/paimon/data/BlobConsumer.java
new file mode 100644
index 0000000000..13ed1d3b37
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobConsumer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+/** A consumer for blob. */
+public interface BlobConsumer {
+
+    /**
+     * Accept a blob descriptor.
+     *
+     * @return Whether to flush to output stream.
+     */
+    boolean accept(BlobDescriptor blobDescriptor);
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FileAwareFormatWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/format/FileAwareFormatWriter.java
new file mode 100644
index 0000000000..0259fb72e2
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/FileAwareFormatWriter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.fs.Path;
+
+/** A {@link FormatWriter} that can be aware of the file. */
+public interface FileAwareFormatWriter extends FormatWriter {
+
+    /** Set the path of the file. */
+    void setFile(Path file);
+
+    /** Whether to delete the file upon abort. */
+    boolean deleteFileUponAbort();
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 14e21e2265..61abbb0cb0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.compact.CompactDeletionFile;
 import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.data.BlobConsumer;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.disk.RowBuffer;
@@ -92,6 +93,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
     @Nullable private final IOManager ioManager;
     private final FileIndexOptions fileIndexOptions;
     private final MemorySize maxDiskSize;
+    @Nullable private final BlobConsumer blobConsumer;
 
     @Nullable private CompactDeletionFile compactDeletionFile;
     private SinkWriter<InternalRow> sinkWriter;
@@ -120,7 +122,8 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
             MemorySize maxDiskSize,
             FileIndexOptions fileIndexOptions,
             boolean asyncFileWrite,
-            boolean statsDenseStore) {
+            boolean statsDenseStore,
+            @Nullable BlobConsumer blobConsumer) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.fileFormat = fileFormat;
@@ -134,6 +137,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
         this.forceCompact = forceCompact;
         this.asyncFileWrite = asyncFileWrite;
         this.statsDenseStore = statsDenseStore;
+        this.blobConsumer = blobConsumer;
         this.newFiles = new ArrayList<>();
         this.deletedFiles = new ArrayList<>();
         this.compactBefore = new ArrayList<>();
@@ -311,7 +315,8 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
                     fileIndexOptions,
                     FileSource.APPEND,
                     asyncFileWrite,
-                    statsDenseStore);
+                    statsDenseStore,
+                    blobConsumer);
         }
         return new RowDataRollingFileWriter(
                 fileIO,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index 8d511c6b91..583c0ee44b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.append;
 
+import org.apache.paimon.data.BlobConsumer;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
@@ -44,6 +45,8 @@ import org.apache.paimon.utils.StatsCollectorFactories;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -91,7 +94,6 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
                             RollingFileWriterImpl<InternalRow, DataFileMeta>, 
List<DataFileMeta>>>
             blobWriterFactory;
     private final long targetFileSize;
-    private final long blobTargetFileSize;
 
     // State management
     private final List<FileWriterAbortExecutor> closedWriters;
@@ -118,11 +120,10 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
             FileIndexOptions fileIndexOptions,
             FileSource fileSource,
             boolean asyncFileWrite,
-            boolean statsDenseStore) {
-
+            boolean statsDenseStore,
+            @Nullable BlobConsumer blobConsumer) {
         // Initialize basic fields
         this.targetFileSize = targetFileSize;
-        this.blobTargetFileSize = blobTargetFileSize;
         this.results = new ArrayList<>();
         this.closedWriters = new ArrayList<>();
 
@@ -161,7 +162,8 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
                                 fileSource,
                                 asyncFileWrite,
                                 statsDenseStore,
-                                blobTargetFileSize);
+                                blobTargetFileSize,
+                                blobConsumer);
     }
 
     /** Creates a factory for normal data writers. */
@@ -221,9 +223,10 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
                     FileSource fileSource,
                     boolean asyncFileWrite,
                     boolean statsDenseStore,
-                    long targetFileSize) {
-
+                    long targetFileSize,
+                    @Nullable BlobConsumer blobConsumer) {
         BlobFileFormat blobFileFormat = new BlobFileFormat();
+        blobFileFormat.setWriteConsumer(blobConsumer);
         List<String> blobNames = blobType.getFieldNames();
 
         // Validate blob field count
@@ -375,7 +378,7 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
     /** Closes the main writer and returns its metadata. */
     private DataFileMeta closeMainWriter() throws IOException {
         currentWriter.close();
-        closedWriters.add(currentWriter.writer().abortExecutor());
+        currentWriter.writer().abortExecutor().ifPresent(closedWriters::add);
         return currentWriter.result();
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
index 7353ea1971..299a4f9742 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
@@ -129,7 +129,7 @@ public class RollingFileWriterImpl<T, R> implements 
RollingFileWriter<T, R> {
         // only store abort executor in memory
         // cannot store whole writer, it includes lots of memory for example 
column vectors to read
         // and write
-        closedWriters.add(currentWriter.abortExecutor());
+        currentWriter.abortExecutor().ifPresent(closedWriters::add);
         results.add(currentWriter.result());
         currentWriter = null;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index 5a810d716f..aea24be3ca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -20,6 +20,7 @@ package org.apache.paimon.io;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.BundleFormatWriter;
+import org.apache.paimon.format.FileAwareFormatWriter;
 import org.apache.paimon.format.FormatWriter;
 import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.SupportsDirectWrite;
@@ -36,6 +37,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Optional;
 import java.util.function.Function;
 
 /**
@@ -52,8 +54,9 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
     protected final Path path;
     private final Function<T, InternalRow> converter;
 
+    private boolean deleteFileUponAbort;
     private FormatWriter writer;
-    private PositionOutputStream out;
+    @Nullable private PositionOutputStream out;
 
     @Nullable private Long outputBytes;
     private long recordCount;
@@ -69,6 +72,8 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
         this.fileIO = fileIO;
         this.path = path;
         this.converter = converter;
+        // true first to clean file in exception
+        this.deleteFileUponAbort = true;
 
         try {
             if (factory instanceof SupportsDirectWrite) {
@@ -80,6 +85,12 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
                 }
                 writer = factory.create(out, compression);
             }
+
+            if (writer instanceof FileAwareFormatWriter) {
+                FileAwareFormatWriter fileAwareFormatWriter = 
(FileAwareFormatWriter) writer;
+                fileAwareFormatWriter.setFile(path);
+                deleteFileUponAbort = 
fileAwareFormatWriter.deleteFileUponAbort();
+            }
         } catch (IOException e) {
             LOG.warn(
                     "Failed to open the bulk writer, closing the output stream 
and throw the error.",
@@ -118,7 +129,7 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
             }
             recordCount += bundle.rowCount();
         } catch (Throwable e) {
-            LOG.warn("Exception occurs when writing file " + path + ". 
Cleaning up.", e);
+            LOG.warn("Exception occurs when writing file {}. Cleaning up.", 
path, e);
             abort();
             throw e;
         }
@@ -135,7 +146,7 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
             recordCount++;
             return rowData;
         } catch (Throwable e) {
-            LOG.warn("Exception occurs when writing file " + path + ". 
Cleaning up.", e);
+            LOG.warn("Exception occurs when writing file {}. Cleaning up.", 
path, e);
             abort();
             throw e;
         }
@@ -160,15 +171,13 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
             IOUtils.closeQuietly(out);
             out = null;
         }
-        fileIO.deleteQuietly(path);
+        abortExecutor().ifPresent(FileWriterAbortExecutor::abort);
     }
 
-    public FileWriterAbortExecutor abortExecutor() {
-        if (!closed) {
-            throw new RuntimeException("Writer should be closed!");
-        }
-
-        return new FileWriterAbortExecutor(fileIO, path);
+    public Optional<FileWriterAbortExecutor> abortExecutor() {
+        return deleteFileUponAbort
+                ? Optional.of(new FileWriterAbortExecutor(fileIO, path))
+                : Optional.empty();
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index ddf2addf53..f0e836a9ac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -24,6 +24,7 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.compact.CompactDeletionFile;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BlobConsumer;
 import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.index.DynamicBucketIndexMaintainer;
@@ -133,6 +134,11 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
         return this;
     }
 
+    @Override
+    public FileStoreWrite<T> withBlobConsumer(BlobConsumer blobConsumer) {
+        return this;
+    }
+
     @Override
     public void withIgnorePreviousFiles(boolean ignorePreviousFiles) {
         this.ignorePreviousFiles = ignorePreviousFiles;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 1c63f41b02..9eb49fee83 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -24,6 +24,7 @@ import org.apache.paimon.append.AppendOnlyWriter;
 import org.apache.paimon.append.cluster.Sorter;
 import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BlobConsumer;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVector;
@@ -82,6 +83,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
     private @Nullable List<String> writeCols;
     private boolean forceBufferSpill = false;
     private boolean withBlob;
+    private @Nullable BlobConsumer blobConsumer;
 
     public BaseAppendFileStoreWrite(
             FileIO fileIO,
@@ -109,6 +111,12 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
         this.fileIndexOptions = options.indexColumnsOptions();
     }
 
+    @Override
+    public BaseAppendFileStoreWrite withBlobConsumer(BlobConsumer 
blobConsumer) {
+        this.blobConsumer = blobConsumer;
+        return this;
+    }
+
     @Override
     protected RecordWriter<InternalRow> createWriter(
             BinaryRow partition,
@@ -142,7 +150,8 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
                 options.writeBufferSpillDiskSize(),
                 fileIndexOptions,
                 options.asyncFileWrite(),
-                options.statsDenseStore());
+                options.statsDenseStore(),
+                blobConsumer);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index ce71b8cd2a..24cb9c1287 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.FileStore;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BlobConsumer;
 import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.index.DynamicBucketIndexMaintainer;
@@ -64,6 +65,8 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
      */
     FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory 
memoryPoolFactory);
 
+    FileStoreWrite<T> withBlobConsumer(BlobConsumer blobConsumer);
+
     /**
      * Set whether the write operation should ignore previously stored files.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
index c74bf95e1a..2f49fa1585 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.format;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.casting.DefaultValueRow;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BlobConsumer;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
@@ -43,14 +44,13 @@ import java.util.stream.Collectors;
 /** {@link TableWrite} implementation for format table. */
 public class FormatTableWrite implements BatchTableWrite {
 
-    private RowType rowType;
+    private final RowType rowType;
     private final FormatTableFileWriter write;
     private final FormatTableRowPartitionKeyExtractor partitionKeyExtractor;
 
     private final int[] notNullFieldIndex;
     private final @Nullable DefaultValueRow defaultValueRow;
     private final ProjectedRow projectedRow;
-    private final RowType writeRowType;
 
     public FormatTableWrite(
             FileIO fileIO,
@@ -68,7 +68,7 @@ public class FormatTableWrite implements BatchTableWrite {
                         .collect(Collectors.toList());
         this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames);
         this.defaultValueRow = DefaultValueRow.create(rowType);
-        this.writeRowType =
+        RowType writeRowType =
                 rowType.project(
                         rowType.getFieldNames().stream()
                                 .filter(name -> 
!partitionType.getFieldNames().contains(name))
@@ -117,6 +117,11 @@ public class FormatTableWrite implements BatchTableWrite {
         return this;
     }
 
+    @Override
+    public TableWrite withBlobConsumer(BlobConsumer blobConsumer) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public BatchTableWrite withIOManager(IOManager ioManager) {
         return this;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
index a36f314cd6..536ebf856a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
@@ -20,11 +20,11 @@ package org.apache.paimon.table.sink;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BlobConsumer;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.memory.MemoryPoolFactory;
-import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.RowType;
@@ -43,13 +43,16 @@ public interface TableWrite extends AutoCloseable {
     /** Specified the write rowType. */
     TableWrite withWriteType(RowType writeType);
 
-    /** With {@link MemorySegmentPool} for the current table write. */
-    default TableWrite withMemoryPool(MemorySegmentPool memoryPool) {
-        return withMemoryPoolFactory(new MemoryPoolFactory(memoryPool));
-    }
-
+    /** With a shared {@link MemoryPoolFactory} for the current table write. */
     TableWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory);
 
+    /**
+     * With a blob consumer for the current table write, all newly written 
Blobs will be called back
+     * through this interface. Note that when this method is invoked, 
temporary files will not be
+     * deleted in case of exceptions. Please delete them yourself.
+     */
+    TableWrite withBlobConsumer(BlobConsumer blobConsumer);
+
     /** Calculate which partition {@code row} belongs to. */
     BinaryRow getPartition(InternalRow row);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index b5e9cebbb5..101a51e8a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.sink;
 import org.apache.paimon.FileStore;
 import org.apache.paimon.casting.DefaultValueRow;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BlobConsumer;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.io.BundleRecords;
@@ -128,6 +129,12 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
         return this;
     }
 
+    @Override
+    public TableWrite withBlobConsumer(BlobConsumer blobConsumer) {
+        write.withBlobConsumer(blobConsumer);
+        return this;
+    }
+
     public TableWriteImpl<T> withCompactExecutor(ExecutorService 
compactExecutor) {
         write.withCompactExecutor(compactExecutor);
         return this;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 56426fce2e..bd31672d6a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -710,7 +710,8 @@ public class AppendOnlyWriterTest {
                         MemorySize.MAX_VALUE,
                         new FileIndexOptions(),
                         true,
-                        false);
+                        false,
+                        null);
         writer.setMemoryPool(
                 new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
         return Pair.of(writer, compactManager.allFiles());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 5bc9105e77..c393ed9017 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -22,25 +22,32 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
 import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobDescriptor;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.DataEvolutionSplitRead;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.UriReader;
 
 import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nonnull;
 
 import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -54,6 +61,27 @@ public class BlobTableTest extends TableTestBase {
 
     private final byte[] blobBytes = randomBytes();
 
+    @Test
+    public void testBlobConsumer() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+        List<BlobDescriptor> blobs = new ArrayList<>();
+        try (BatchTableWrite write = table.newBatchWriteBuilder().newWrite()) {
+            write.withBlobConsumer(blobs::add);
+            write.write(dataDefault(0, 0));
+            write.write(dataDefault(0, 0));
+        }
+        assertThat(blobs.size()).isEqualTo(2);
+        FileIO fileIO = table.fileIO();
+        UriReader uriReader = UriReader.fromFile(fileIO);
+        for (BlobDescriptor blob : blobs) {
+            assertThat(Blob.fromDescriptor(uriReader, 
blob).toData()).isEqualTo(blobBytes);
+        }
+        for (BlobDescriptor blob : blobs) {
+            fileIO.deleteQuietly(new Path(blob.uri()));
+        }
+    }
+
     @Test
     public void testBasic() throws Exception {
         createTableDefault();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
index e2e68905c8..8fe9f193dc 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
@@ -105,8 +105,8 @@ public class RollingBlobFileWriterTest {
                         new FileIndexOptions(),
                         FileSource.APPEND,
                         false, // asyncFileWrite
-                        false // statsDenseStore
-                        );
+                        false, // statsDenseStore
+                        null);
     }
 
     @Test
@@ -205,8 +205,8 @@ public class RollingBlobFileWriterTest {
                         new FileIndexOptions(),
                         FileSource.APPEND,
                         false, // asyncFileWrite
-                        false // statsDenseStore
-                        );
+                        false, // statsDenseStore
+                        null);
 
         // Create large blob data that will exceed the blob target file size
         byte[] largeBlobData = new byte[3 * 1024 * 1024]; // 3 MB blob data
@@ -282,8 +282,8 @@ public class RollingBlobFileWriterTest {
                         new FileIndexOptions(),
                         FileSource.APPEND,
                         false, // asyncFileWrite
-                        false // statsDenseStore
-                        );
+                        false, // statsDenseStore
+                        null);
 
         // Create blob data that will trigger rolling
         byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
@@ -361,8 +361,8 @@ public class RollingBlobFileWriterTest {
                         new FileIndexOptions(),
                         FileSource.APPEND,
                         false, // asyncFileWrite
-                        false // statsDenseStore
-                        );
+                        false, // statsDenseStore
+                        null);
 
         // Create blob data that will trigger rolling (non-descriptor mode: 
direct blob data)
         byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
@@ -579,8 +579,8 @@ public class RollingBlobFileWriterTest {
                         new FileIndexOptions(),
                         FileSource.APPEND,
                         false, // asyncFileWrite
-                        false // statsDenseStore
-                        );
+                        false, // statsDenseStore
+                        null);
 
         // Write data
         for (int i = 0; i < 3; i++) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
deleted file mode 100644
index 7d1b4f502f..0000000000
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.append.AppendOnlyWriter;
-import org.apache.paimon.append.BucketedAppendCompactManager;
-import org.apache.paimon.compression.CompressOptions;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.fileindex.FileIndexOptions;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.KeyValueFileReadWriteTest;
-import org.apache.paimon.io.KeyValueFileWriterFactory;
-import org.apache.paimon.memory.HeapMemorySegmentPool;
-import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.VarCharType;
-import org.apache.paimon.utils.CommitIncrement;
-import org.apache.paimon.utils.StatsCollectorFactories;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.util.Collections;
-import java.util.LinkedList;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** test file format suffix. */
-public class FileFormatSuffixTest extends KeyValueFileReadWriteTest {
-
-    private static final RowType SCHEMA =
-            RowType.of(
-                    new DataType[] {new IntType(), new VarCharType(), new 
VarCharType()},
-                    new String[] {"id", "name", "dt"});
-
-    @Test
-    public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws 
Exception {
-        String format = "avro";
-        KeyValueFileWriterFactory writerFactory = 
createWriterFactory(tempDir.toString(), format);
-        Path path = writerFactory.pathFactory(0).newPath();
-        assertThat(path.toString().endsWith(format)).isTrue();
-
-        DataFilePathFactory dataFilePathFactory =
-                new DataFilePathFactory(
-                        new Path(tempDir + "/dt=1/bucket-1"),
-                        format,
-                        CoreOptions.DATA_FILE_PREFIX.defaultValue(),
-                        CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
-                        CoreOptions.FILE_COMPRESSION.defaultValue(),
-                        null);
-        FileFormat fileFormat = FileFormat.fromIdentifier(format, new 
Options());
-        LinkedList<DataFileMeta> toCompact = new LinkedList<>();
-        CoreOptions options =
-                new 
CoreOptions(Collections.singletonMap("metadata.stats-mode", "truncate(16)"));
-        AppendOnlyWriter appendOnlyWriter =
-                new AppendOnlyWriter(
-                        LocalFileIO.create(),
-                        IOManager.create(tempDir.toString()),
-                        0,
-                        fileFormat,
-                        10,
-                        10,
-                        SCHEMA,
-                        null,
-                        0,
-                        new BucketedAppendCompactManager(
-                                null, toCompact, null, 4, 10, false, null, 
null), // not used
-                        null,
-                        false,
-                        dataFilePathFactory,
-                        null,
-                        false,
-                        false,
-                        CoreOptions.FILE_COMPRESSION.defaultValue(),
-                        CompressOptions.defaultOptions(),
-                        new StatsCollectorFactories(options),
-                        MemorySize.MAX_VALUE,
-                        new FileIndexOptions(),
-                        true,
-                        false);
-        appendOnlyWriter.setMemoryPool(
-                new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
-        appendOnlyWriter.write(
-                GenericRow.of(1, BinaryString.fromString("aaa"), 
BinaryString.fromString("1")));
-        CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
-        appendOnlyWriter.close();
-
-        DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0);
-        assertThat(meta.fileName().endsWith(format)).isTrue();
-    }
-}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 025f5849c5..f5c26445bf 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -22,10 +22,17 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueSerializerTest;
 import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.append.AppendOnlyWriter;
+import org.apache.paimon.append.BucketedAppendCompactManager;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FlushingFileFormat;
 import org.apache.paimon.format.SimpleColStats;
 import org.apache.paimon.fs.FileIO;
@@ -34,14 +41,21 @@ import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.stats.StatsTestUtils;
 import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.FailingFileIO;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.StatsCollectorFactories;
 
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
@@ -50,8 +64,10 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
@@ -224,38 +240,83 @@ public class KeyValueFileReadWriteTest {
                                                 kv.value().getInt(1))));
     }
 
-    protected KeyValueFileWriterFactory createWriterFactory(String pathStr, 
String format) {
-        Path path = new Path(pathStr);
-        FileStorePathFactory pathFactory =
-                new FileStorePathFactory(
-                        path,
-                        RowType.of(),
-                        CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
+    @Test
+    public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws 
Exception {
+        RowType schema =
+                RowType.of(
+                        new DataType[] {new IntType(), new VarCharType(), new 
VarCharType()},
+                        new String[] {"id", "name", "dt"});
+
+        String format = "avro";
+        KeyValueFileWriterFactory writerFactory = 
createWriterFactory(tempDir.toString(), format);
+        Path path = writerFactory.pathFactory(0).newPath();
+        assertThat(path.toString().endsWith(format)).isTrue();
+
+        DataFilePathFactory dataFilePathFactory =
+                new DataFilePathFactory(
+                        new Path(tempDir + "/dt=1/bucket-1"),
                         format,
                         CoreOptions.DATA_FILE_PREFIX.defaultValue(),
                         CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-                        
CoreOptions.PARTITION_GENERATE_LEGACY_NAME.defaultValue(),
                         
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
                         CoreOptions.FILE_COMPRESSION.defaultValue(),
+                        null);
+        FileFormat fileFormat = FileFormat.fromIdentifier(format, new 
Options());
+        LinkedList<DataFileMeta> toCompact = new LinkedList<>();
+        CoreOptions options =
+                new 
CoreOptions(Collections.singletonMap("metadata.stats-mode", "truncate(16)"));
+        AppendOnlyWriter appendOnlyWriter =
+                new AppendOnlyWriter(
+                        LocalFileIO.create(),
+                        IOManager.create(tempDir.toString()),
+                        0,
+                        fileFormat,
+                        10,
+                        10,
+                        schema,
+                        null,
+                        0,
+                        new BucketedAppendCompactManager(
+                                null, toCompact, null, 4, 10, false, null, 
null), // not used
                         null,
+                        false,
+                        dataFilePathFactory,
                         null,
-                        CoreOptions.ExternalPathStrategy.NONE,
+                        false,
+                        false,
+                        CoreOptions.FILE_COMPRESSION.defaultValue(),
+                        CompressOptions.defaultOptions(),
+                        new StatsCollectorFactories(options),
+                        MemorySize.MAX_VALUE,
+                        new FileIndexOptions(),
+                        true,
                         false,
                         null);
+        appendOnlyWriter.setMemoryPool(
+                new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
+        appendOnlyWriter.write(
+                GenericRow.of(1, BinaryString.fromString("aaa"), 
BinaryString.fromString("1")));
+        CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
+        appendOnlyWriter.close();
+
+        DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0);
+        assertThat(meta.fileName().endsWith(format)).isTrue();
+    }
+
+    protected KeyValueFileWriterFactory createWriterFactory(String pathStr, 
String format) {
+        Path path = new Path(pathStr);
         int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 
1024;
         FileIO fileIO = FileIOFinder.find(path);
         Options options = new Options();
         options.set(CoreOptions.METADATA_STATS_MODE, "FULL");
 
         Function<String, FileStorePathFactory> pathFactoryMap =
-                new Function<String, FileStorePathFactory>() {
-                    @Override
-                    public FileStorePathFactory apply(String format) {
-                        return new FileStorePathFactory(
+                format1 ->
+                        new FileStorePathFactory(
                                 path,
                                 RowType.of(),
                                 
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
-                                format,
+                                format1,
                                 CoreOptions.DATA_FILE_PREFIX.defaultValue(),
                                 
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
                                 
CoreOptions.PARTITION_GENERATE_LEGACY_NAME.defaultValue(),
@@ -266,8 +327,6 @@ public class KeyValueFileReadWriteTest {
                                 CoreOptions.ExternalPathStrategy.NONE,
                                 false,
                                 null);
-                    }
-                };
 
         return KeyValueFileWriterFactory.builder(
                         fileIO,
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
index 78f06b486b..06ebc18b6f 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.format.blob;
 
+import org.apache.paimon.data.BlobConsumer;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.EmptyStatsExtractor;
 import org.apache.paimon.format.FileFormat;
@@ -49,6 +50,8 @@ public class BlobFileFormat extends FileFormat {
 
     private final boolean blobAsDescriptor;
 
+    @Nullable public BlobConsumer writeConsumer;
+
     public BlobFileFormat() {
         this(false);
     }
@@ -62,6 +65,10 @@ public class BlobFileFormat extends FileFormat {
         return fileName.endsWith("." + BlobFileFormatFactory.IDENTIFIER);
     }
 
+    public void setWriteConsumer(@Nullable BlobConsumer writeConsumer) {
+        this.writeConsumer = writeConsumer;
+    }
+
     @Override
     public FormatReaderFactory createReaderFactory(
             RowType dataSchemaRowType,
@@ -90,11 +97,11 @@ public class BlobFileFormat extends FileFormat {
         return Optional.of(new EmptyStatsExtractor());
     }
 
-    private static class BlobFormatWriterFactory implements 
FormatWriterFactory {
+    private class BlobFormatWriterFactory implements FormatWriterFactory {
 
         @Override
         public FormatWriter create(PositionOutputStream out, String 
compression) {
-            return new BlobFormatWriter(out);
+            return new BlobFormatWriter(out, writeConsumer);
         }
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index bed455fda7..b06fb05d96 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -37,6 +37,7 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
 
     private final FileIO fileIO;
     private final Path filePath;
+    private final String filePathString;
     private final BlobFileMeta fileMeta;
     private final @Nullable SeekableInputStream in;
 
@@ -46,6 +47,7 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
             FileIO fileIO, Path filePath, BlobFileMeta fileMeta, @Nullable 
SeekableInputStream in) {
         this.fileIO = fileIO;
         this.filePath = filePath;
+        this.filePathString = filePath.toString();
         this.fileMeta = fileMeta;
         this.in = in;
         this.returned = false;
@@ -86,7 +88,7 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
                 if (in != null) {
                     blob = Blob.fromData(readInlineBlob(in, offset, length));
                 } else {
-                    blob = Blob.fromFile(fileIO, filePath.toString(), offset, 
length);
+                    blob = Blob.fromFile(fileIO, filePathString, offset, 
length);
                 }
                 currentPosition++;
                 return GenericRow.of(blob);
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
index a1b1c113d5..4198edc27e 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
@@ -19,13 +19,19 @@
 package org.apache.paimon.format.blob;
 
 import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobConsumer;
+import org.apache.paimon.data.BlobDescriptor;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileAwareFormatWriter;
 import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.utils.DeltaVarintCompressor;
 import org.apache.paimon.utils.LongArrayList;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.zip.CRC32;
 
@@ -34,24 +40,38 @@ import static 
org.apache.paimon.utils.StreamUtils.intToLittleEndian;
 import static org.apache.paimon.utils.StreamUtils.longToLittleEndian;
 
 /** {@link FormatWriter} for blob file. */
-public class BlobFormatWriter implements FormatWriter {
+public class BlobFormatWriter implements FileAwareFormatWriter {
 
     public static final byte VERSION = 1;
     public static final int MAGIC_NUMBER = 1481511375;
     public static final byte[] MAGIC_NUMBER_BYTES = 
intToLittleEndian(MAGIC_NUMBER);
 
     private final PositionOutputStream out;
+    @Nullable private final BlobConsumer writeConsumer;
     private final CRC32 crc32;
     private final byte[] tmpBuffer;
     private final LongArrayList lengths;
 
-    public BlobFormatWriter(PositionOutputStream out) {
+    private String pathString;
+
+    public BlobFormatWriter(PositionOutputStream out, @Nullable BlobConsumer 
writeConsumer) {
         this.out = out;
+        this.writeConsumer = writeConsumer;
         this.crc32 = new CRC32();
         this.tmpBuffer = new byte[4096];
         this.lengths = new LongArrayList(16);
     }
 
+    @Override
+    public void setFile(Path file) {
+        this.pathString = file.toString();
+    }
+
+    @Override
+    public boolean deleteFileUponAbort() {
+        return writeConsumer == null;
+    }
+
     @Override
     public void addElement(InternalRow element) throws IOException {
         checkArgument(element.getFieldCount() == 1, "BlobFormatWriter only 
support one field.");
@@ -62,6 +82,8 @@ public class BlobFormatWriter implements FormatWriter {
         crc32.reset();
 
         write(MAGIC_NUMBER_BYTES);
+
+        long blobPos = out.getPos();
         try (SeekableInputStream in = blob.newInputStream()) {
             int bytesRead = in.read(tmpBuffer);
             while (bytesRead >= 0) {
@@ -70,12 +92,21 @@ public class BlobFormatWriter implements FormatWriter {
             }
         }
 
+        long blobLength = out.getPos() - blobPos;
         long binLength = out.getPos() - previousPos + 12;
         lengths.add(binLength);
         byte[] lenBytes = longToLittleEndian(binLength);
         write(lenBytes);
         int crcValue = (int) crc32.getValue();
         out.write(intToLittleEndian(crcValue));
+
+        if (writeConsumer != null) {
+            BlobDescriptor descriptor = new BlobDescriptor(pathString, 
blobPos, blobLength);
+            boolean flush = writeConsumer.accept(descriptor);
+            if (flush) {
+                out.flush();
+            }
+        }
     }
 
     private void write(byte[] bytes) throws IOException {

Reply via email to